Validation API
Complete API reference for the response validation and routing system that processes agent responses and determines execution flow.
Overview
The Validation API provides centralized response processing and routing decisions, handling multiple response formats and ensuring all actions comply with topology permissions.
ValidationProcessor
Central hub for all response parsing in the coordination system.
Import
from marsys.coordination.validation import ValidationProcessor
Constructor
ValidationProcessor(topology_graph: TopologyGraph)
process_response
async def process_response(raw_response: Any,agent: BaseAgent,branch: ExecutionBranch,exec_state: ExecutionState) -> ValidationResult
Parameters
| Parameter | Type | Description |
|---|---|---|
| raw_response | Any | Agent response to validate |
| agent | BaseAgent | The responding agent instance |
| branch | ExecutionBranch | Current execution branch |
| exec_state | ExecutionState | Current execution state |
Example
processor = ValidationProcessor(topology_graph)result = await processor.process_response(raw_response={"next_action": "invoke_agent", "action_input": "Analyzer"},agent=coordinator_agent,branch=current_branch,exec_state=execution_state)if result.is_valid:print(f"Action: {result.action_type}")print(f"Next agents: {result.next_agents}")
ValidationResult
Result of response validation.
Attributes
| Attribute | Type | Description |
|---|---|---|
| is_valid | bool | Whether validation succeeded |
| action_type | ActionType | Type of action to execute |
| parsed_response | Dict[str, Any] | Parsed response data |
| error_message | str | Error description if invalid |
| retry_suggestion | str | Suggestion for retry |
| invocations | List[AgentInvocation] | Agent invocation details |
| tool_calls | List[Dict] | Tool call specifications |
Example
if result.is_valid:if result.action_type == ActionType.INVOKE_AGENT:next_agent = result.next_agents[0]print(f"Invoking: {next_agent}")elif result.action_type == ActionType.PARALLEL_INVOKE:print(f"Parallel invoke: {result.next_agents}")elif result.action_type == ActionType.FINAL_RESPONSE:print(f"Final: {result.parsed_response['content']}")
ActionType
Enumeration of supported action types.
| Value | Description | Response Format |
|---|---|---|
| INVOKE_AGENT | Sequential agent invocation | {"next_action": "invoke_agent", "action_input": "Agent"} |
| PARALLEL_INVOKE | Parallel agent execution | {"next_action": "parallel_invoke", "agents": [...]} |
| CALL_TOOL | Tool execution | {"next_action": "call_tool", "tool_calls": [...]} |
| FINAL_RESPONSE | Complete execution | {"next_action": "final_response", "content": "..."} |
| END_CONVERSATION | End conversation branch | {"next_action": "end_conversation"} |
| ERROR_RECOVERY | Route to user for recovery | {"next_action": "error_recovery", ...} |
Router
Converts validation results into execution decisions.
Import
from marsys.coordination.routing import Router
route
async def route(validation_result: ValidationResult,current_branch: ExecutionBranch,routing_context: RoutingContext) -> RoutingDecision
Example
router = Router(topology_graph)decision = await router.route(validation_result=validation_result,current_branch=current_branch,routing_context=RoutingContext(metadata={"retry_count": 0},error_info=None))# Process routing decisionfor step in decision.next_steps:if step.step_type == StepType.AGENT_INVOCATION:await invoke_agent(step.target)
Response Formats
Standard JSON Response
# Sequential invocation{"thought": "I need to analyze this data","next_action": "invoke_agent","action_input": "DataAnalyzer"}# With request data{"next_action": "invoke_agent","action_input": "DataAnalyzer","request": "Analyze sales data for Q4"}
Parallel Invocation
{"thought": "These can run in parallel","next_action": "parallel_invoke","agents": ["Worker1", "Worker2", "Worker3"],"agent_requests": {"Worker1": "Process segment A","Worker2": "Process segment B","Worker3": "Process segment C"}}
Tool Calls
{"next_action": "call_tool","tool_calls": [{"id": "call_123","type": "function","function": {"name": "search","arguments": "{\"query\": \"AI trends\"}"}}]}
Final Response
# Text response{"next_action": "final_response","content": "Here is the analysis result..."}# Structured response{"next_action": "final_response","content": {"title": "Analysis Report","sections": [...],"conclusion": "..."}}
Validation Flow
# 1. Receive agent responseresponse = await agent.run(prompt)# 2. Process responsevalidation_result = await validation_processor.process_response(raw_response=response,agent=agent,branch=current_branch,exec_state=execution_state)# 3. Route based on validationif validation_result.is_valid:routing_decision = await router.route(validation_result=validation_result,current_branch=current_branch,routing_context=context)# 4. Execute next stepsfor step in routing_decision.next_steps:await execute_step(step)else:# Handle validation errorlogger.error(f"Validation failed: {validation_result.error_message}")if validation_result.retry_suggestion:# Apply steering for retryawait apply_steering(validation_result.retry_suggestion)
Best Practices
Do
- Validate all agent responses through ValidationProcessor
- Use structured response formats for clarity
- Include error recovery actions in critical workflows
- Check topology permissions before invocation
Don't
- Parse responses manually outside ValidationProcessor
- Skip validation for "trusted" agents
- Ignore validation errors
- Hard-code routing logic outside Router
Pro Tip
The ValidationProcessor supports multiple response formats simultaneously. Processors are evaluated by priority, allowing fallback from structured to unstructured formats.
Important
All response parsing MUST go through ValidationProcessor to ensure consistency and topology compliance.