Validation API
Complete API reference for the response validation and routing system that processes agent responses and determines execution flow.
Overview
The Validation API provides centralized response processing and routing decisions, handling multiple response formats and ensuring all actions comply with topology permissions.
Core Classes
ValidationProcessor
Central hub for all response parsing in the coordination system.
Import
from marsys.coordination.validation import ValidationProcessor
Constructor
ValidationProcessor(topology_graph: TopologyGraph,response_format: str = "json")
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| topology_graph | TopologyGraph | Required | The topology graph for permission validation |
| response_format | str | "json" | Response format name (e.g., "json") |
process_response
async def process_response(raw_response: Any,agent: BaseAgent,branch: ExecutionBranch,exec_state: ExecutionState) -> ValidationResult
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| raw_response | Any | Required | Agent response to validate |
| agent | BaseAgent | Required | The responding agent instance |
| branch | ExecutionBranch | Required | Current execution branch |
| exec_state | ExecutionState | Required | Current execution state |
Returns: ValidationResult with parsed action and validation status.
Example
processor = ValidationProcessor(topology_graph)result = await processor.process_response(raw_response={"next_action": "invoke_agent", "action_input": "Analyzer"},agent=coordinator_agent,branch=current_branch,exec_state=execution_state)if result.is_valid:print(f"Action: {result.action_type}")print(f"Next agents: {result.next_agents}")
ValidationResult
Result of response validation.
Import
from marsys.coordination.validation import ValidationResult
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| is_valid | bool | - | Whether validation succeeded |
| action_type | ActionType | - | Type of action to execute |
| parsed_response | Dict[str, Any] | - | Parsed response data |
| error_message | str | - | Error description if invalid |
| retry_suggestion | str | - | Suggestion for retry |
| invocations | List[AgentInvocation] | - | Agent invocation details |
| tool_calls | List[Dict] | - | Tool call specifications |
Properties
| Parameter | Type | Default | Description |
|---|---|---|---|
| next_agents | List[str] | - | Agent names to invoke |
Example
if result.is_valid:if result.action_type == ActionType.INVOKE_AGENT:next_agent = result.next_agents[0]print(f"Invoking: {next_agent}")elif result.action_type == ActionType.PARALLEL_INVOKE:print(f"Parallel invoke: {result.next_agents}")elif result.action_type == ActionType.FINAL_RESPONSE:print(f"Final: {result.parsed_response['content']}")
ActionType
Enumeration of supported action types.
Import
from marsys.coordination.validation import ActionType
Values
| Value | Description | Response Format |
|---|---|---|
| INVOKE_AGENT | Sequential agent invocation | {"next_action": "invoke_agent", "action_input": "Agent"} |
| PARALLEL_INVOKE | Parallel agent execution | {"next_action": "parallel_invoke", "agents": [...], "agent_requests": {...}} |
| CALL_TOOL | Tool execution | {"next_action": "call_tool", "tool_calls": [...]} |
| FINAL_RESPONSE | Complete execution | {"next_action": "final_response", "content": "..."} |
| END_CONVERSATION | End conversation branch | {"next_action": "end_conversation"} |
| WAIT_AND_AGGREGATE | Wait for parallel results | {"next_action": "wait_and_aggregate"} |
| ERROR_RECOVERY | Route to user for recovery | {"next_action": "error_recovery", "error_details": {...}} |
| TERMINAL_ERROR | Display terminal error | {"next_action": "terminal_error", "error": "..."} |
Router
Converts validation results into execution decisions.
Import
from marsys.coordination.routing import Router
Constructor
Router(topology_graph: TopologyGraph)
route
async def route(validation_result: ValidationResult,current_branch: ExecutionBranch,routing_context: RoutingContext) -> RoutingDecision
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| validation_result | ValidationResult | Required | Result from validation |
| current_branch | ExecutionBranch | Required | Current execution branch |
| routing_context | RoutingContext | Required | Additional routing context |
Returns: RoutingDecision with next steps and branch specifications.
Example
router = Router(topology_graph)decision = await router.route(validation_result=validation_result,current_branch=current_branch,routing_context=RoutingContext(metadata={"retry_count": 0},error_info=None))# Process routing decisionfor step in decision.next_steps:if step.step_type == StepType.AGENT_INVOCATION:await invoke_agent(step.target)
RoutingDecision
Decision about next execution steps.
Import
from marsys.coordination.routing import RoutingDecision
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| next_steps | List[ExecutionStep] | - | Steps to execute |
| should_continue | bool | - | Whether to continue execution |
| branch_specs | List[BranchSpec] | - | Specifications for new branches |
| metadata | Dict[str, Any] | - | Additional metadata |
Example
decision = RoutingDecision(next_steps=[ExecutionStep(step_type=StepType.AGENT_INVOCATION,target="Analyzer",data={"request": "Analyze data"})],should_continue=True,branch_specs=[],metadata={"step_count": 5})
RoutingContext
Context information for routing decisions.
Import
from marsys.coordination.routing import RoutingContext
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| metadata | Dict[str, Any] | - | General metadata |
| error_info | Optional[Dict] | - | Error information if present |
| retry_count | int | - | Number of retry attempts |
| steering_enabled | bool | - | Whether steering is enabled |
ExecutionStep
Individual step to execute.
Import
from marsys.coordination.routing import ExecutionStep, StepType
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| step_type | StepType | - | Type of step |
| target | str | - | Target agent or tool |
| data | Dict[str, Any] | - | Step data |
| metadata | Dict[str, Any] | - | Step metadata |
StepType Enum
class StepType(Enum):AGENT_INVOCATION = "agent_invocation"TOOL_EXECUTION = "tool_execution"PARALLEL_SPAWN = "parallel_spawn"WAIT_FOR_CONVERGENCE = "wait_for_convergence"FINAL_RESPONSE = "final_response"ERROR_RECOVERY = "error_recovery"
Response Formats
Standard JSON Response
# Sequential invocation{"thought": "I need to analyze this data","next_action": "invoke_agent","action_input": "DataAnalyzer"}# With request data{"next_action": "invoke_agent","action_input": "DataAnalyzer","request": "Analyze sales data for Q4"}
Parallel Invocation
{"thought": "These can run in parallel","next_action": "parallel_invoke","agents": ["Worker1", "Worker2", "Worker3"],"agent_requests": {"Worker1": "Process segment A","Worker2": "Process segment B","Worker3": "Process segment C"}}
Tool Calls
{"next_action": "call_tool","tool_calls": [{"id": "call_123","type": "function","function": {"name": "search","arguments": "{\"query\": \"AI trends\"}"}}]}
Final Response
# Text response{"next_action": "final_response","content": "Here is the analysis result..."}# Structured response{"next_action": "final_response","content": {"title": "Analysis Report","sections": [...],"conclusion": "..."}}
Error Recovery
{"next_action": "error_recovery","error_details": {"type": "api_quota_exceeded","message": "OpenAI API quota exceeded","provider": "openai"},"suggested_action": "retry"}
Response Format System
MARSYS uses a pluggable response format architecture that separates system prompt building from response parsing.
Architecture Overview
The format system consists of:
- BaseResponseFormat -- Abstract base class defining the format interface
- SystemPromptBuilder -- Builds system prompts using the configured format
- ResponseProcessor -- Base class for response parsing
- Format Registry -- Registry for available formats
from marsys.coordination.formats import (SystemPromptBuilder,BaseResponseFormat,JSONResponseFormat,AgentContext,CoordinationContext,)
SystemPromptBuilder
Builds system prompts for agents using the configured response format.
Import
from marsys.coordination.formats import SystemPromptBuilder
Constructor
SystemPromptBuilder(response_format: str = "json")
build
def build(agent_context: AgentContext,coordination_context: CoordinationContext,environmental: Optional[dict] = None) -> str
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| agent_context | AgentContext | Required | Agent-specific context |
| coordination_context | CoordinationContext | Required | Topology context |
| environmental | Optional[dict] | None | Environmental data (date, etc.) |
Example
builder = SystemPromptBuilder(response_format="json")system_prompt = builder.build(agent_context=AgentContext(name="Coordinator",goal="Coordinate tasks",instruction="You coordinate worker agents..."),coordination_context=CoordinationContext(next_agents=["Worker1", "Worker2"],can_return_final_response=True))
AgentContext
Context derived from the agent for prompt building.
Import
from marsys.coordination.formats import AgentContext
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| name | str | - | Agent name |
| goal | str | - | Agent goal description |
| instruction | str | - | Agent behavior instructions |
| tools | Optional[Dict] | - | Available tools |
| tools_schema | Optional[List[Dict]] | - | Tool schemas for prompt |
| input_schema | Optional[Dict] | - | Expected input format |
| output_schema | Optional[Dict] | - | Expected output format |
| memory_retention | str | - | Memory retention policy |
CoordinationContext
Context from the coordination system for prompt building.
Import
from marsys.coordination.formats import CoordinationContext
Attributes
| Parameter | Type | Default | Description |
|---|---|---|---|
| next_agents | List[str] | - | Agents this agent can invoke |
| can_return_final_response | bool | - | Whether agent can return final response |
Format Registry
Functions for managing available response formats.
Import
from marsys.coordination.formats import (register_format,get_format,list_formats,set_default_format,is_format_registered)
Functions
| Function | Description | Returns |
|---|---|---|
| register_format(name, format_class) | Register a new format | None |
| get_format(name) | Get format instance | BaseResponseFormat |
| list_formats() | List registered formats | List[str] |
| set_default_format(name) | Set default format | None |
| is_format_registered(name) | Check if format exists | bool |
Example
# List available formatsformats = list_formats() # ["json"]# Get specific formatjson_format = get_format("json")# Register custom formatregister_format("xml", XMLResponseFormat)
BaseResponseFormat
Abstract base class for implementing response formats.
Import
from marsys.coordination.formats import BaseResponseFormat
Abstract Methods
| Method | Description |
|---|---|
| get_format_name() | Return format name (e.g., "json") |
| build_format_instructions(actions, descriptions) | Build format-specific instructions |
| build_action_descriptions(actions, context) | Build action descriptions |
| get_examples(actions, context) | Generate format-specific examples |
| get_parallel_invocation_examples(context) | Examples for parallel invocation |
| create_processor() | Create response processor for this format |
Built-in Format: JSONResponseFormat -- Default JSON format with next_action/action_input structure.
Response Processors
Built-in Processors
# Structured JSON Processorclass StructuredJSONProcessor(ResponseProcessor):"""Handles JSON responses with next_action structure."""def can_process(self, response: Any) -> bool:return isinstance(response, dict) and "next_action" in responsedef priority(self) -> int:return 80 # Below error and tool processors# Tool Call Processorclass ToolCallProcessor(ResponseProcessor):"""Handles native tool call responses."""def can_process(self, response: Any) -> bool:return hasattr(response, 'tool_calls')def priority(self) -> int:return 90# Error Message Processorclass ErrorMessageProcessor(ResponseProcessor):"""Handles error Messages from agents."""def can_process(self, response: Any) -> bool:return isinstance(response, Message) and response.role == "error"def priority(self) -> int:return 100 # Highest priority
Custom Processor
from marsys.coordination.formats import ResponseProcessorclass CustomFormatProcessor(ResponseProcessor):"""Process custom response format."""def can_process(self, response: Any) -> bool:return isinstance(response, dict) and "custom_action" in responsedef process(self, response: Any) -> Optional[Dict[str, Any]]:return {"next_action": self._map_action(response["custom_action"]),"content": response.get("data")}def priority(self) -> int:return 75 # Between JSON and tool processors# Register processorvalidation_processor.register_processor(CustomFormatProcessor())
Validation Flow
Complete Validation Process
# 1. Receive agent responseresponse = await agent.run(prompt)# 2. Process responsevalidation_result = await validation_processor.process_response(raw_response=response,agent=agent,branch=current_branch,exec_state=execution_state)# 3. Route based on validationif validation_result.is_valid:routing_decision = await router.route(validation_result=validation_result,current_branch=current_branch,routing_context=context)# 4. Execute next stepsfor step in routing_decision.next_steps:await execute_step(step)else:# Handle validation errorlogger.error(f"Validation failed: {validation_result.error_message}")if validation_result.retry_suggestion:# Apply steering for retryawait apply_steering(validation_result.retry_suggestion)
Error Handling
Validation Errors
if not result.is_valid:error_type = result.error_messageif "not allowed" in error_type:# Permission denied - agent not in topologylogger.error(f"Permission denied: {error_type}")elif "format" in error_type:# Invalid response formatlogger.error(f"Format error: {error_type}")# Use retry suggestionif result.retry_suggestion:steering = f"Please retry with: {result.retry_suggestion}"elif "missing" in error_type:# Missing required fieldslogger.error(f"Missing fields: {error_type}")
Error Recovery
# Agent can trigger error recoveryresponse = {"next_action": "error_recovery","error_details": {"type": "rate_limit","message": "API rate limit exceeded","retry_after": 60},"suggested_action": "wait_and_retry"}# Routes to User node for intervention
Best Practices
Do
- Validate all agent responses through ValidationProcessor
- Use structured response formats for clarity
- Include error recovery actions in critical workflows
- Check topology permissions before invocation
- Provide retry suggestions for recoverable errors
Don't
- Parse responses manually outside ValidationProcessor
- Skip validation for "trusted" agents
- Ignore validation errors
- Mix response formats within single agent
- Hard-code routing logic outside Router
Related Documentation
- Execution API -- Execution system using validation
- Topology API -- Topology permissions
- Router Patterns -- Routing patterns
- Error Handling -- Error recovery
Pro Tip
The ValidationProcessor supports multiple response formats simultaneously. Processors are evaluated by priority, allowing fallback from structured to unstructured formats.
Important
All response parsing MUST go through ValidationProcessor to ensure consistency and topology compliance.