Error Handling
MARSYS provides a comprehensive error handling system that ensures robust operation, graceful degradation, and intelligent recovery in multi-agent workflows.
Overview
The error handling system provides:
- Hierarchical Exceptions: Granular error categorization with rich context
- Intelligent Recovery: Automatic retry strategies and fallback mechanisms
- Error Routing: Route errors to User nodes for human intervention
- Provider-Specific Handling: Tailored strategies for different AI providers
- Comprehensive Logging: Detailed error tracking and monitoring
Exception Hierarchy
Base Exception
All MARSYS exceptions inherit from AgentFrameworkError:
from marsys.agents.exceptions import AgentFrameworkError, AgentErrorclass AgentFrameworkError(Exception):"""Base exception for all MARSYS framework errors."""passclass AgentError(AgentFrameworkError):"""Exception for agent-specific errors."""pass# The actual exception classes are simpler than previously documented.# Complex error metadata is handled at the coordination layer, not in exceptions.
Error Categories
The framework uses specific exception types for different error scenarios:
from marsys.agents.exceptions import (AgentFrameworkError,AgentError,ToolExecutionError,ToolCallError,ModelError)# Agent-specific errorsclass AgentError(AgentFrameworkError):"""Agent execution and initialization errors."""pass# Tool execution errorsclass ToolExecutionError(AgentFrameworkError):"""Tool function execution failures."""passclass ToolCallError(AgentFrameworkError):"""Tool call format errors (malformed arguments, missing required fields)."""passclass ActionValidationError(ValidationError):"""Invalid agent actions."""pass# Configuration Errorsclass ConfigurationError(MarsysError):"""Configuration problems."""passclass AgentConfigurationError(ConfigurationError):"""Agent setup errors."""passclass TopologyError(ConfigurationError):"""Topology definition errors."""pass# Execution Errorsclass ExecutionError(MarsysError):"""Runtime execution failures."""passclass AgentExecutionError(ExecutionError):"""Agent execution failures."""passclass TimeoutError(ExecutionError):"""Operation timeout."""pass# Permission Errorsclass PermissionError(MarsysError):"""Access control violations."""passclass AgentPermissionError(PermissionError):"""Agent invocation denied."""pass# API Errorsclass APIError(MarsysError):"""External API failures."""passclass RateLimitError(APIError):"""API rate limit exceeded."""recoverable = True
Error Handling Patterns
Try-Catch with Context
async def execute_agent_task(agent, task, context):"""Execute task with comprehensive error handling."""try:result = await agent.run(task, context)return resultexcept ValidationError as e:# Handle validation errorslogger.warning(f"Validation error: {e.message}")if e.suggestion:logger.info(f"Suggestion: {e.suggestion}")# Try with corrected inputcorrected_task = correct_validation_issues(task, e)return await agent.run(corrected_task, context)except RateLimitError as e:# Handle rate limits with backoffwait_time = e.context.get("retry_after", 60)logger.info(f"Rate limited. Waiting {wait_time}s...")await asyncio.sleep(wait_time)return await execute_agent_task(agent, task, context)except AgentPermissionError as e:# Route to user for permissionif context.get("user_recovery"):return await route_to_user_for_permission(e, context)raiseexcept TimeoutError as e:# Handle timeout with retry or cancellationif e.recoverable and context.get("retry_count", 0) < 3:context["retry_count"] = context.get("retry_count", 0) + 1return await execute_agent_task(agent, task, context)raiseexcept ToolCallError as e:# Tool call format errors - need steering, not blind retries# The framework automatically sets error context for the agentlogger.warning(f"Tool call format error: {e}")# Let the steering system guide the agent to fix the tool callraiseexcept MarsysError as e:# Log framework errorslogger.error(f"Framework error: {e.to_dict()}")raiseexcept Exception as e:# Wrap unexpected errorswrapped = ExecutionError(f"Unexpected error: {str(e)}",context={"original_error": str(e), "type": type(e).__name__})logger.error(f"Unexpected error: {wrapped.to_dict()}")raise wrapped
Agent Name Used as Tool Call
When an agent emits a tool_calls entry whose name matches a peer agent, MARSYS returns a targeted error instead of a generic "tool not found" message.
Current behavior:
- The executor checks next-hop peer agents from topology context.
- If the name is a peer agent, the response explains that peer agents are not tools.
- The response includes the correct JSON pattern for agent invocation.
Use this form for peer-agent handoff:
{"thought": "Need specialized processing from another agent","next_action": "invoke_agent","action_input": [{"agent_name": "Analyzer","request": "Analyze the uploaded dataset and report anomalies"}]}
Do not place peer-agent names inside tool_calls.
Automatic Retry in API Adapters
Built-in Retry Logic
MARSYS automatically retries server-side API errors with exponential backoff at the adapter level. No manual retry logic needed for API calls!
How It Works
All API adapters (APIProviderAdapter and AsyncBaseAPIAdapter) automatically retry:
Retryable Status Codes:
- 500 - Internal Server Error
- 502 - Bad Gateway
- 503 - Service Unavailable
- 504 - Gateway Timeout
- 529 - Overloaded (Anthropic)
- 408 - Request Timeout (OpenRouter)
- 429 - Rate Limit (respects
retry-afterheader)
Configuration: Max retries: 3 (total 4 attempts), Base delay: 1 second, Exponential backoff: 1s, 2s, 4s
# Example log output during retry:# 2025-11-05 00:58:46 - WARNING - Server error 503 from gpt-5.3-codex. Retry 1/3 after 1.0s# 2025-11-05 00:58:47 - WARNING - Server error 503 from gpt-5.3-codex. Retry 2/3 after 2.0s# 2025-11-05 00:58:49 - INFO - Request successful after 2 retries
Provider-Specific Behavior
# Retryable errors- 408: Request Timeout -> retry after 5s- 429: Rate Limit -> respect X-RateLimit-Reset header- 502: Bad Gateway (provider error) -> retry after 5s- 503: Service Unavailable -> retry after 10s- 500+: Server errors -> retry after 10s# Non-retryable errors (fail immediately)- 400: Bad Request (malformed request)- 401: Unauthorized (invalid API key)- 402: Insufficient Credits- 403: Forbidden (moderation flagged)
Payload Too Large Recovery
MARSYS has a dedicated recovery path for oversized request payloads (commonly image-heavy traces).
Classification rules:
- HTTP 413 is classified provider-agnostically as
request_too_large - Message-text override also classifies
request_too_largewhen payload hints appear, including:request_too_large,payload too large,request exceeds the maximum,request body is too large
Execution flow:
- Provider adapter /
BaseAPIModelraisesModelAPIErrorwith classificationrequest_too_large Agent._run()re-raises this specific error (instead of converting to an error Message)Agent.run_step()catches it, triggersmemory.compact_for_payload_error(...), re-prepares messages, and retries- Retry is bounded to a hard cap of 2 attempts (
_MAX_PAYLOAD_RETRIES = 2) - If compaction does not reduce payload enough (or retries are exhausted), the agent returns an error Message
- Coordination treats
request_too_largeas terminal (terminal_error) to avoid endless retry loops
Payload Compaction Notes
The payload compaction path is separate from adapter-level transient retries (5xx/429). Recovery compaction success is measured by payload-byte reduction, not just token estimates.
When to Use Manual Retry
The built-in adapter retry handles transient API errors. Use manual retry for:
- Application-level errors (business logic failures)
- Multi-step workflows (coordinating multiple agents)
- Custom retry policies (non-standard backoff)
Manual Retry with Exponential Backoff
For application-level retry needs:
class RetryHandler:"""Intelligent retry with exponential backoff."""def __init__(self,max_retries: int = 3,base_delay: float = 1.0,max_delay: float = 60.0,exponential_base: float = 2.0):self.max_retries = max_retriesself.base_delay = base_delayself.max_delay = max_delayself.exponential_base = exponential_baseasync def execute_with_retry(self,func: Callable,*args,**kwargs) -> Any:"""Execute function with retry logic."""last_exception = Nonefor attempt in range(self.max_retries):try:return await func(*args, **kwargs)except RateLimitError as e:# Use API-provided retry delay if availabledelay = e.context.get("retry_after", self._calculate_delay(attempt))logger.info(f"Rate limited. Retry {attempt + 1}/{self.max_retries} in {delay}s")await asyncio.sleep(delay)last_exception = eexcept TimeoutError as e:if not e.recoverable:raisedelay = self._calculate_delay(attempt)logger.warning(f"Timeout. Retry {attempt + 1}/{self.max_retries} in {delay}s")await asyncio.sleep(delay)last_exception = eexcept APIError as e:# Note: API adapters already retry server errors automatically# This manual retry is for application-level API error handlingif e.context.get("status_code") in [500, 502, 503, 504]:delay = self._calculate_delay(attempt)logger.warning(f"Server error (after adapter retries). Retry {attempt + 1}/{self.max_retries} in {delay}s")await asyncio.sleep(delay)last_exception = eelse:raiseexcept Exception as e:# Don't retry unexpected errorsraise# Max retries exceededraise ExecutionError(f"Max retries ({self.max_retries}) exceeded",context={"last_error": str(last_exception)},suggestion="Consider increasing timeout or using a different approach")def _calculate_delay(self, attempt: int) -> float:"""Calculate exponential backoff delay."""delay = self.base_delay * (self.exponential_base ** attempt)# Add jitterdelay = delay * (0.5 + random.random())return min(delay, self.max_delay)
Circuit Breaker Pattern
class CircuitBreaker:"""Prevent cascading failures with circuit breaker."""def __init__(self,failure_threshold: int = 5,recovery_timeout: float = 60.0,expected_exception: type = APIError):self.failure_threshold = failure_thresholdself.recovery_timeout = recovery_timeoutself.expected_exception = expected_exceptionself.failure_count = 0self.last_failure_time = Noneself.state = "closed" # closed, open, half-openasync def call(self, func: Callable, *args, **kwargs) -> Any:"""Execute function with circuit breaker protection."""# Check circuit stateif self.state == "open":if self._should_attempt_reset():self.state = "half-open"else:raise ExecutionError("Circuit breaker is open",context={"failure_count": self.failure_count},suggestion=f"Wait {self.recovery_timeout}s for recovery")# Execute functiontry:result = await func(*args, **kwargs)# Success - reset circuitif self.state == "half-open":self._reset()return resultexcept self.expected_exception as e:self._record_failure()if self.failure_count >= self.failure_threshold:self._trip()raisedef _should_attempt_reset(self) -> bool:"""Check if we should try to reset circuit."""return (self.last_failure_time and(datetime.now() - self.last_failure_time).total_seconds()>= self.recovery_timeout)def _record_failure(self):"""Record a failure."""self.failure_count += 1self.last_failure_time = datetime.now()def _reset(self):"""Reset circuit to closed state."""self.failure_count = 0self.last_failure_time = Noneself.state = "closed"def _trip(self):"""Trip circuit to open state."""self.state = "open"logger.warning(f"Circuit breaker tripped after {self.failure_count} failures")
Error Configuration
ErrorHandlingConfig
from marsys.coordination.config import ErrorHandlingConfigconfig = ErrorHandlingConfig(# Classificationuse_error_classification=True,classify_as_critical=["PermissionError", "ConfigurationError"],classify_as_recoverable=["RateLimitError", "TimeoutError"],# Notificationsnotify_on_critical_errors=True,notification_channels=["terminal", "log"],# Recoveryenable_error_routing=True, # Route to User nodeerror_recovery_timeout=300.0,# Retry settingsauto_retry_on_rate_limits=True,max_rate_limit_retries=3,base_retry_delay=1.0,max_retry_delay=60.0,# Circuit breakerenable_circuit_breaker=True,circuit_breaker_threshold=5,circuit_breaker_timeout=60.0,# Provider-specificprovider_settings={"openai": {"max_retries": 3,"base_retry_delay": 60,"rate_limit_strategy": "exponential_backoff","insufficient_quota_action": "fallback"},"anthropic": {"max_retries": 2,"base_retry_delay": 30,"rate_limit_strategy": "fixed_delay","insufficient_quota_action": "raise"},"google": {"max_retries": 3,"base_retry_delay": 45,"rate_limit_strategy": "exponential_backoff","insufficient_quota_action": "queue"}})
Error Recovery Strategies
User-Driven Recovery
Route errors to User node for human intervention:
class UserRecoveryHandler:"""Handle error recovery through user interaction."""async def handle_error_with_user(self,error: MarsysError,context: Dict[str, Any],topology: Topology) -> Optional[Any]:"""Route error to User node for recovery."""if not topology.has_node("User"):return None# Format error for usererror_message = self._format_error_for_user(error)# Create recovery optionsoptions = self._get_recovery_options(error)# Ask user for decisionuser_prompt = f"""{error_message}How would you like to proceed?{self._format_options(options)}"""# Get user responseresponse = await self._get_user_input(user_prompt, context)# Execute recovery based on user choicereturn await self._execute_recovery(response, error, context)def _get_recovery_options(self, error: MarsysError) -> List[str]:"""Get recovery options based on error type."""options = ["Abort execution"]if error.recoverable:options.insert(0, "Retry operation")if isinstance(error, ValidationError):options.insert(0, "Provide corrected input")if isinstance(error, PermissionError):options.insert(0, "Grant permission")if isinstance(error, RateLimitError):options.insert(0, "Wait and retry")return options
Fallback Strategies
class FallbackHandler:"""Implement fallback strategies for errors."""def __init__(self, fallback_map: Dict[str, List[str]]):self.fallback_map = fallback_map # Primary -> [fallbacks]async def execute_with_fallback(self,primary_agent: str,task: str,context: Dict[str, Any]) -> Any:"""Execute with fallback agents on failure."""agents_to_try = [primary_agent] + self.fallback_map.get(primary_agent, [])for agent_name in agents_to_try:try:agent = AgentRegistry.get(agent_name)if not agent:continueresult = await agent.run(task, context)if agent_name != primary_agent:logger.info(f"Fallback to {agent_name} succeeded")return resultexcept MarsysError as e:logger.warning(f"Agent {agent_name} failed: {e.message}")if agent_name == agents_to_try[-1]:# No more fallbacksraise ExecutionError(f"All agents failed: {', '.join(agents_to_try)}",context={"last_error": str(e)},suggestion="Consider revising the task or adding more fallback agents")continue
Best Practices
1. Use Specific Exceptions
# GOOD - Specific exception with contextraise MessageFormatError("Invalid JSON in model response",context={"response": response[:200],"expected": "JSON object with 'next_action' field"},suggestion="Ensure model is prompted to return valid JSON",recoverable=True)# BAD - Generic exceptionraise Exception("Invalid response")
2. Provide Recovery Information
# GOOD - Actionable errorraise RateLimitError("OpenAI API rate limit exceeded",context={"limit": "10000 tokens/min","used": "10500 tokens","retry_after": 60},suggestion="Wait 60 seconds or use a different API key",recoverable=True)# BAD - No recovery inforaise APIError("Rate limit hit")
3. Chain Exceptions
# GOOD - Preserve original contexttry:result = await risky_operation()except ValueError as e:raise ValidationError(f"Operation failed: {str(e)}",context={"original_error": str(e), "traceback": traceback.format_exc()}) from e# BAD - Lost contexttry:result = await risky_operation()except:raise ValidationError("Operation failed")
4. Log at Appropriate Levels
# GOOD - Appropriate loggingtry:result = await operation()except ValidationError as e:logger.warning(f"Validation failed: {e.message}") # Warning for recoverableexcept PermissionError as e:logger.error(f"Permission denied: {e.message}") # Error for criticalexcept Exception as e:logger.exception(f"Unexpected error: {e}") # Full traceback for unexpected
Don't Swallow Errors
Always either handle an error appropriately or re-raise it. Silent failures make debugging extremely difficult in multi-agent workflows.
Error Handling Ready!
You now understand MARSYS error handling. Robust error management ensures your multi-agent workflows are resilient and maintainable.