Execution API Reference
Complete API reference for the execution system that manages branch execution, step processing, and dynamic parallelism.
The Execution API handles the runtime execution of multi-agent workflows, including branch management, step execution, and dynamic parallel invocation.
BranchExecutor
Manages the execution of different branch patterns in the workflow.
Import
from marsys.coordination.execution import BranchExecutor
Constructor
BranchExecutor(validation_processor: ValidationProcessor,router: Router,step_executor: StepExecutor,context_manager: ContextManager,config: ExecutionConfig)
execute_branch
async def execute_branch(branch: ExecutionBranch,initial_request: Any,context: Dict[str, Any],resume_with_results: Optional[Dict] = None) -> BranchResult
Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
| branch | ExecutionBranch | Branch to execute | Required |
| initial_request | Any | Initial task/prompt | Required |
| context | Dict[str, Any] | Execution context | Required |
| resume_with_results | Optional[Dict] | Child branch results | None |
Returns: BranchResult with execution outcome
Example
executor = BranchExecutor(validation_processor=validator,router=router,step_executor=step_exec,context_manager=ctx_mgr,config=exec_config)result = await executor.execute_branch(branch=main_branch,initial_request="Analyze this data",context={"session_id": "123"})
ExecutionBranch
Represents a branch of execution in the workflow.
Import
from marsys.coordination.branches.types import ExecutionBranch, BranchType, BranchStatus
Attributes
| Attribute | Type | Description |
|---|---|---|
| branch_id | str | Unique branch identifier |
| branch_type | BranchType | Type of branch |
| status | BranchStatus | Current status |
| parent_branch_id | Optional[str] | Parent branch if nested |
| agent_sequence | List[str] | Agents to execute |
| current_step | int | Current execution step |
| memory | Dict[str, List] | Per-agent memory |
| metadata | Dict[str, Any] | Branch metadata |
BranchType Enum
class BranchType(Enum):SIMPLE = "simple" # Sequential executionCONVERSATION = "conversation" # Bidirectional dialogueNESTED = "nested" # Has child branchesUSER_INTERACTION = "user_interaction" # Human-in-the-loop
BranchStatus Enum
class BranchStatus(Enum):PENDING = "pending" # Not startedRUNNING = "running" # Currently executingWAITING = "waiting" # Waiting for child branchesCOMPLETED = "completed" # Successfully finishedFAILED = "failed" # Terminated with error
Example
from marsys.coordination.branches.types import ExecutionBranch, BranchTypebranch = ExecutionBranch(branch_id="main_001",branch_type=BranchType.SIMPLE,agent_sequence=["Coordinator", "Worker"],metadata={"priority": "high"})
StepExecutor
Executes individual steps within a branch.
Import
from marsys.coordination.execution import StepExecutor
Constructor
StepExecutor(config: Optional[ExecutionConfig] = None,tool_executor: Optional[ToolExecutor] = None,user_node_handler: Optional[UserNodeHandler] = None,event_bus: Optional[EventBus] = None,system_prompt_builder: Optional[SystemPromptBuilder] = None,max_retries: int = 5,retry_delay: float = 1.0)
Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
| config | Optional[ExecutionConfig] | Execution configuration | None |
| tool_executor | Optional[ToolExecutor] | Tool execution handler | None |
| user_node_handler | Optional[UserNodeHandler] | User interaction handler | None |
| event_bus | Optional[EventBus] | Event bus for status updates | None |
| system_prompt_builder | Optional[SystemPromptBuilder] | Builds agent system prompts using response format | None |
| max_retries | int | Maximum retry attempts | 5 |
| retry_delay | float | Delay between retries (seconds) | 1.0 |
execute_step
async def execute_step(agent: Union[BaseAgent, str],request: Any,memory: List[Dict[str, Any]],context: Dict[str, Any]) -> StepResult
StepResult
@dataclassclass StepResult:"""Result of executing a single step within a branch."""# Core execution fields (set by StepExecutor)agent_name: strsuccess: boolresponse: Any = None# Execution metadata (set by StepExecutor)step_id: Optional[str] = Nonememory_updates: List[Dict[str, Any]] = field(default_factory=list)tool_calls: List[Dict[str, Any]] = field(default_factory=list)tool_results: List[Dict[str, Any]] = field(default_factory=list)error: Optional[str] = Nonerequires_retry: bool = Falsecontext_selection: Optional[Dict[str, Any]] = Nonemetadata: Dict[str, Any] = field(default_factory=dict)# Routing decision fields (set by BranchExecutor ONLY)action_type: Optional[str] = Noneparsed_response: Optional[Dict[str, Any]] = Nonenext_agent: Optional[str] = Noneshould_end_branch: bool = Falsewaiting_for_children: bool = Falsechild_branch_ids: List[str] = field(default_factory=list)convergence_target: Optional[str] = None
Example
from marsys.coordination.formats import SystemPromptBuilderstep_executor = StepExecutor(config=exec_config,system_prompt_builder=SystemPromptBuilder(response_format="json"))result = await step_executor.execute_step(agent="Analyzer",request="Analyze sales data",memory=conversation_memory,context={"session": "123"})
DynamicBranchSpawner
Handles runtime creation of parallel branches.
Import
from marsys.coordination.execution import DynamicBranchSpawner
Constructor
DynamicBranchSpawner(branch_executor: BranchExecutor,context_manager: ContextManager,config: ExecutionConfig)
handle_parallel_invocation
async def handle_parallel_invocation(agents: List[str],requests: Dict[str, Any],parent_branch: ExecutionBranch,context: Dict[str, Any]) -> List[asyncio.Task]
| Parameter | Type | Description |
|---|---|---|
| agents | List[str] | Agents to invoke in parallel |
| requests | Dict[str, Any] | Per-agent requests |
| parent_branch | ExecutionBranch | Parent branch |
| context | Dict[str, Any] | Execution context |
Returns: List of async tasks for parallel execution
Example
spawner = DynamicBranchSpawner(branch_executor=branch_exec,context_manager=ctx_mgr,config=exec_config)# Spawn parallel branchestasks = await spawner.handle_parallel_invocation(agents=["Worker1", "Worker2", "Worker3"],requests={"Worker1": "Task 1","Worker2": "Task 2","Worker3": "Task 3"},parent_branch=main_branch,context={"parallel": True})# Wait for completionresults = await asyncio.gather(*tasks)
BranchResult
Result of branch execution.
Import
from marsys.coordination.branches.types import BranchResult
Attributes
| Attribute | Type | Description |
|---|---|---|
| branch_id | str | Branch identifier |
| status | BranchStatus | Final status |
| final_agent | Optional[str] | Last agent executed |
| final_response | Any | Final output |
| steps_executed | int | Number of steps |
| execution_time | float | Total time in seconds |
| metadata | Dict[str, Any] | Result metadata |
| error | Optional[str] | Error if failed |
Example
if result.status == BranchStatus.COMPLETED:print(f"Success: {result.final_response}")print(f"Executed {result.steps_executed} steps")print(f"Time: {result.execution_time:.2f}s")else:print(f"Failed: {result.error}")
ToolExecutor
Executes tool calls within agent steps.
Import
from marsys.coordination.execution import ToolExecutor
execute_tool_calls
async def execute_tool_calls(tool_calls: List[ToolCall],agent_name: str,context: Dict[str, Any]) -> List[ToolResult]
| Parameter | Type | Description |
|---|---|---|
| tool_calls | List[ToolCall] | Tools to execute |
| agent_name | str | Agent making calls |
| context | Dict[str, Any] | Execution context |
Agent Invocation Note
execute_tool_calls is for tools only. If a name matches a peer agent, it must be invoked through agent action JSON (next_action="invoke_agent"), not through tool_calls.
Example
tool_executor = ToolExecutor(agent_registry=AgentRegistry,config=exec_config)results = await tool_executor.execute_tool_calls(tool_calls=[ToolCall(name="search", arguments={"query": "AI news"}),ToolCall(name="summarize", arguments={"text": "..."})],agent_name="Researcher",context={})
Execution Flow
Sequential Execution
# Simple sequential branchbranch = ExecutionBranch(branch_type=BranchType.SIMPLE,agent_sequence=["Agent1", "Agent2", "Agent3"])result = await executor.execute_branch(branch=branch,initial_request="Process this",context={})
Conversation Execution
# Bidirectional conversationbranch = ExecutionBranch(branch_type=BranchType.CONVERSATION,agent_sequence=["Agent1", "Agent2"],metadata={"max_turns": 5})result = await executor.execute_branch(branch=branch,initial_request="Let's discuss",context={})
Nested Execution
# Branch with child branchesparent_branch = ExecutionBranch(branch_type=BranchType.NESTED,agent_sequence=["Coordinator"])# Coordinator spawns child branches dynamicallyresult = await executor.execute_branch(branch=parent_branch,initial_request="Coordinate tasks",context={"allow_parallel": True})
Parallel Execution
Creating Parallel Branches
# Response that triggers parallel executionresponse = {"next_action": "parallel_invoke","agents": ["Worker1", "Worker2", "Worker3"],"agent_requests": {"Worker1": "Analyze segment A","Worker2": "Analyze segment B","Worker3": "Analyze segment C"}}# DynamicBranchSpawner handles this automatically
Convergence Points
# Wait for all parallel branchesresults = await spawner.wait_for_convergence(child_branch_ids=["branch_1", "branch_2", "branch_3"],timeout=300.0 # 5 minutes)# Resume parent with aggregated resultsparent_result = await executor.execute_branch(branch=parent_branch,initial_request=None,context=context,resume_with_results=results)
Best Practices
Do
- Set appropriate timeouts for branches
- Handle errors gracefully
- Use branch metadata for tracking
- Monitor memory usage in long conversations
- Implement convergence points for parallel execution
Don't
- Create infinite loops without limits
- Skip error handling
- Ignore memory management
- Forget to set convergence timeouts
Pro Tip
Use BranchType.CONVERSATION for agent dialogues and BranchType.NESTED for dynamic parallelism. The executor handles the complexity automatically.