Validation API

Complete API reference for the response validation and routing system that processes agent responses and determines execution flow.

Overview

The Validation API provides centralized response processing and routing decisions, handling multiple response formats and ensuring all actions comply with topology permissions.

Core Classes

ValidationProcessor

Central hub for all response parsing in the coordination system.

Import

from marsys.coordination.validation import ValidationProcessor

Constructor

ValidationProcessor(
topology_graph: TopologyGraph,
response_format: str = "json"
)

Constructor Parameters

Parameter
topology_graph
Type
TopologyGraph
Default
Required
Description
The topology graph for permission validation
Parameter
response_format
Type
str
Default
"json"
Description
Response format name (e.g., "json")

process_response

async def process_response(
raw_response: Any,
agent: BaseAgent,
branch: ExecutionBranch,
exec_state: ExecutionState
) -> ValidationResult

Parameters

Parameter
raw_response
Type
Any
Default
Required
Description
Agent response to validate
Parameter
agent
Type
BaseAgent
Default
Required
Description
The responding agent instance
Parameter
branch
Type
ExecutionBranch
Default
Required
Description
Current execution branch
Parameter
exec_state
Type
ExecutionState
Default
Required
Description
Current execution state

Returns: ValidationResult with parsed action and validation status.

Example

processor = ValidationProcessor(topology_graph)
result = await processor.process_response(
raw_response={"next_action": "invoke_agent", "action_input": "Analyzer"},
agent=coordinator_agent,
branch=current_branch,
exec_state=execution_state
)
if result.is_valid:
print(f"Action: {result.action_type}")
print(f"Next agents: {result.next_agents}")

ValidationResult

Result of response validation.

Import

from marsys.coordination.validation import ValidationResult

Attributes

Parameter
is_valid
Type
bool
Default
-
Description
Whether validation succeeded
Parameter
action_type
Type
ActionType
Default
-
Description
Type of action to execute
Parameter
parsed_response
Type
Dict[str, Any]
Default
-
Description
Parsed response data
Parameter
error_message
Type
str
Default
-
Description
Error description if invalid
Parameter
retry_suggestion
Type
str
Default
-
Description
Suggestion for retry
Parameter
invocations
Type
List[AgentInvocation]
Default
-
Description
Agent invocation details
Parameter
tool_calls
Type
List[Dict]
Default
-
Description
Tool call specifications

Properties

Parameter
next_agents
Type
List[str]
Default
-
Description
Agent names to invoke

Example

if result.is_valid:
if result.action_type == ActionType.INVOKE_AGENT:
next_agent = result.next_agents[0]
print(f"Invoking: {next_agent}")
elif result.action_type == ActionType.PARALLEL_INVOKE:
print(f"Parallel invoke: {result.next_agents}")
elif result.action_type == ActionType.FINAL_RESPONSE:
print(f"Final: {result.parsed_response['content']}")

ActionType

Enumeration of supported action types.

Import

from marsys.coordination.validation import ActionType

Values

ValueDescriptionResponse Format
INVOKE_AGENTSequential agent invocation{"next_action": "invoke_agent", "action_input": "Agent"}
PARALLEL_INVOKEParallel agent execution{"next_action": "parallel_invoke", "agents": [...], "agent_requests": {...}}
CALL_TOOLTool execution{"next_action": "call_tool", "tool_calls": [...]}
FINAL_RESPONSEComplete execution{"next_action": "final_response", "content": "..."}
END_CONVERSATIONEnd conversation branch{"next_action": "end_conversation"}
WAIT_AND_AGGREGATEWait for parallel results{"next_action": "wait_and_aggregate"}
ERROR_RECOVERYRoute to user for recovery{"next_action": "error_recovery", "error_details": {...}}
TERMINAL_ERRORDisplay terminal error{"next_action": "terminal_error", "error": "..."}

Router

Converts validation results into execution decisions.

Import

from marsys.coordination.routing import Router

Constructor

Router(topology_graph: TopologyGraph)

route

async def route(
validation_result: ValidationResult,
current_branch: ExecutionBranch,
routing_context: RoutingContext
) -> RoutingDecision

Parameters

Parameter
validation_result
Type
ValidationResult
Default
Required
Description
Result from validation
Parameter
current_branch
Type
ExecutionBranch
Default
Required
Description
Current execution branch
Parameter
routing_context
Type
RoutingContext
Default
Required
Description
Additional routing context

Returns: RoutingDecision with next steps and branch specifications.

Example

router = Router(topology_graph)
decision = await router.route(
validation_result=validation_result,
current_branch=current_branch,
routing_context=RoutingContext(
metadata={"retry_count": 0},
error_info=None
)
)
# Process routing decision
for step in decision.next_steps:
if step.step_type == StepType.AGENT_INVOCATION:
await invoke_agent(step.target)

RoutingDecision

Decision about next execution steps.

Import

from marsys.coordination.routing import RoutingDecision

Attributes

Parameter
next_steps
Type
List[ExecutionStep]
Default
-
Description
Steps to execute
Parameter
should_continue
Type
bool
Default
-
Description
Whether to continue execution
Parameter
branch_specs
Type
List[BranchSpec]
Default
-
Description
Specifications for new branches
Parameter
metadata
Type
Dict[str, Any]
Default
-
Description
Additional metadata

Example

decision = RoutingDecision(
next_steps=[
ExecutionStep(
step_type=StepType.AGENT_INVOCATION,
target="Analyzer",
data={"request": "Analyze data"}
)
],
should_continue=True,
branch_specs=[],
metadata={"step_count": 5}
)

RoutingContext

Context information for routing decisions.

Import

from marsys.coordination.routing import RoutingContext

Attributes

Parameter
metadata
Type
Dict[str, Any]
Default
-
Description
General metadata
Parameter
error_info
Type
Optional[Dict]
Default
-
Description
Error information if present
Parameter
retry_count
Type
int
Default
-
Description
Number of retry attempts
Parameter
steering_enabled
Type
bool
Default
-
Description
Whether steering is enabled

ExecutionStep

Individual step to execute.

Import

from marsys.coordination.routing import ExecutionStep, StepType

Attributes

Parameter
step_type
Type
StepType
Default
-
Description
Type of step
Parameter
target
Type
str
Default
-
Description
Target agent or tool
Parameter
data
Type
Dict[str, Any]
Default
-
Description
Step data
Parameter
metadata
Type
Dict[str, Any]
Default
-
Description
Step metadata

StepType Enum

class StepType(Enum):
AGENT_INVOCATION = "agent_invocation"
TOOL_EXECUTION = "tool_execution"
PARALLEL_SPAWN = "parallel_spawn"
WAIT_FOR_CONVERGENCE = "wait_for_convergence"
FINAL_RESPONSE = "final_response"
ERROR_RECOVERY = "error_recovery"

Response Formats

Standard JSON Response

# Sequential invocation
{
"thought": "I need to analyze this data",
"next_action": "invoke_agent",
"action_input": "DataAnalyzer"
}
# With request data
{
"next_action": "invoke_agent",
"action_input": "DataAnalyzer",
"request": "Analyze sales data for Q4"
}

Parallel Invocation

{
"thought": "These can run in parallel",
"next_action": "parallel_invoke",
"agents": ["Worker1", "Worker2", "Worker3"],
"agent_requests": {
"Worker1": "Process segment A",
"Worker2": "Process segment B",
"Worker3": "Process segment C"
}
}

Tool Calls

{
"next_action": "call_tool",
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "search",
"arguments": "{\"query\": \"AI trends\"}"
}
}
]
}

Final Response

# Text response
{
"next_action": "final_response",
"content": "Here is the analysis result..."
}
# Structured response
{
"next_action": "final_response",
"content": {
"title": "Analysis Report",
"sections": [...],
"conclusion": "..."
}
}

Error Recovery

{
"next_action": "error_recovery",
"error_details": {
"type": "api_quota_exceeded",
"message": "OpenAI API quota exceeded",
"provider": "openai"
},
"suggested_action": "retry"
}

Response Format System

MARSYS uses a pluggable response format architecture that separates system prompt building from response parsing.

Architecture Overview

The format system consists of:

  • BaseResponseFormat -- Abstract base class defining the format interface
  • SystemPromptBuilder -- Builds system prompts using the configured format
  • ResponseProcessor -- Base class for response parsing
  • Format Registry -- Registry for available formats
from marsys.coordination.formats import (
SystemPromptBuilder,
BaseResponseFormat,
JSONResponseFormat,
AgentContext,
CoordinationContext,
)

SystemPromptBuilder

Builds system prompts for agents using the configured response format.

Import

from marsys.coordination.formats import SystemPromptBuilder

Constructor

SystemPromptBuilder(response_format: str = "json")

build

def build(
agent_context: AgentContext,
coordination_context: CoordinationContext,
environmental: Optional[dict] = None
) -> str

Parameters

Parameter
agent_context
Type
AgentContext
Default
Required
Description
Agent-specific context
Parameter
coordination_context
Type
CoordinationContext
Default
Required
Description
Topology context
Parameter
environmental
Type
Optional[dict]
Default
None
Description
Environmental data (date, etc.)

Example

builder = SystemPromptBuilder(response_format="json")
system_prompt = builder.build(
agent_context=AgentContext(
name="Coordinator",
goal="Coordinate tasks",
instruction="You coordinate worker agents..."
),
coordination_context=CoordinationContext(
next_agents=["Worker1", "Worker2"],
can_return_final_response=True
)
)

AgentContext

Context derived from the agent for prompt building.

Import

from marsys.coordination.formats import AgentContext

Attributes

Parameter
name
Type
str
Default
-
Description
Agent name
Parameter
goal
Type
str
Default
-
Description
Agent goal description
Parameter
instruction
Type
str
Default
-
Description
Agent behavior instructions
Parameter
tools
Type
Optional[Dict]
Default
-
Description
Available tools
Parameter
tools_schema
Type
Optional[List[Dict]]
Default
-
Description
Tool schemas for prompt
Parameter
input_schema
Type
Optional[Dict]
Default
-
Description
Expected input format
Parameter
output_schema
Type
Optional[Dict]
Default
-
Description
Expected output format
Parameter
memory_retention
Type
str
Default
-
Description
Memory retention policy

CoordinationContext

Context from the coordination system for prompt building.

Import

from marsys.coordination.formats import CoordinationContext

Attributes

Parameter
next_agents
Type
List[str]
Default
-
Description
Agents this agent can invoke
Parameter
can_return_final_response
Type
bool
Default
-
Description
Whether agent can return final response

Format Registry

Functions for managing available response formats.

Import

from marsys.coordination.formats import (
register_format,
get_format,
list_formats,
set_default_format,
is_format_registered
)

Functions

FunctionDescriptionReturns
register_format(name, format_class)Register a new formatNone
get_format(name)Get format instanceBaseResponseFormat
list_formats()List registered formatsList[str]
set_default_format(name)Set default formatNone
is_format_registered(name)Check if format existsbool

Example

# List available formats
formats = list_formats() # ["json"]
# Get specific format
json_format = get_format("json")
# Register custom format
register_format("xml", XMLResponseFormat)

BaseResponseFormat

Abstract base class for implementing response formats.

Import

from marsys.coordination.formats import BaseResponseFormat

Abstract Methods

MethodDescription
get_format_name()Return format name (e.g., "json")
build_format_instructions(actions, descriptions)Build format-specific instructions
build_action_descriptions(actions, context)Build action descriptions
get_examples(actions, context)Generate format-specific examples
get_parallel_invocation_examples(context)Examples for parallel invocation
create_processor()Create response processor for this format

Built-in Format: JSONResponseFormat -- Default JSON format with next_action/action_input structure.

Response Processors

Built-in Processors

# Structured JSON Processor
class StructuredJSONProcessor(ResponseProcessor):
"""Handles JSON responses with next_action structure."""
def can_process(self, response: Any) -> bool:
return isinstance(response, dict) and "next_action" in response
def priority(self) -> int:
return 80 # Below error and tool processors
# Tool Call Processor
class ToolCallProcessor(ResponseProcessor):
"""Handles native tool call responses."""
def can_process(self, response: Any) -> bool:
return hasattr(response, 'tool_calls')
def priority(self) -> int:
return 90
# Error Message Processor
class ErrorMessageProcessor(ResponseProcessor):
"""Handles error Messages from agents."""
def can_process(self, response: Any) -> bool:
return isinstance(response, Message) and response.role == "error"
def priority(self) -> int:
return 100 # Highest priority

Custom Processor

from marsys.coordination.formats import ResponseProcessor
class CustomFormatProcessor(ResponseProcessor):
"""Process custom response format."""
def can_process(self, response: Any) -> bool:
return isinstance(response, dict) and "custom_action" in response
def process(self, response: Any) -> Optional[Dict[str, Any]]:
return {
"next_action": self._map_action(response["custom_action"]),
"content": response.get("data")
}
def priority(self) -> int:
return 75 # Between JSON and tool processors
# Register processor
validation_processor.register_processor(CustomFormatProcessor())

Validation Flow

Complete Validation Process

# 1. Receive agent response
response = await agent.run(prompt)
# 2. Process response
validation_result = await validation_processor.process_response(
raw_response=response,
agent=agent,
branch=current_branch,
exec_state=execution_state
)
# 3. Route based on validation
if validation_result.is_valid:
routing_decision = await router.route(
validation_result=validation_result,
current_branch=current_branch,
routing_context=context
)
# 4. Execute next steps
for step in routing_decision.next_steps:
await execute_step(step)
else:
# Handle validation error
logger.error(f"Validation failed: {validation_result.error_message}")
if validation_result.retry_suggestion:
# Apply steering for retry
await apply_steering(validation_result.retry_suggestion)

Error Handling

Validation Errors

if not result.is_valid:
error_type = result.error_message
if "not allowed" in error_type:
# Permission denied - agent not in topology
logger.error(f"Permission denied: {error_type}")
elif "format" in error_type:
# Invalid response format
logger.error(f"Format error: {error_type}")
# Use retry suggestion
if result.retry_suggestion:
steering = f"Please retry with: {result.retry_suggestion}"
elif "missing" in error_type:
# Missing required fields
logger.error(f"Missing fields: {error_type}")

Error Recovery

# Agent can trigger error recovery
response = {
"next_action": "error_recovery",
"error_details": {
"type": "rate_limit",
"message": "API rate limit exceeded",
"retry_after": 60
},
"suggested_action": "wait_and_retry"
}
# Routes to User node for intervention

Best Practices

Do

  • Validate all agent responses through ValidationProcessor
  • Use structured response formats for clarity
  • Include error recovery actions in critical workflows
  • Check topology permissions before invocation
  • Provide retry suggestions for recoverable errors

Don't

  • Parse responses manually outside ValidationProcessor
  • Skip validation for "trusted" agents
  • Ignore validation errors
  • Mix response formats within single agent
  • Hard-code routing logic outside Router

Pro Tip

The ValidationProcessor supports multiple response formats simultaneously. Processors are evaluated by priority, allowing fallback from structured to unstructured formats.

Important

All response parsing MUST go through ValidationProcessor to ensure consistency and topology compliance.