Execution API
Complete API reference for the execution system that manages branch execution, step processing, and dynamic parallelism.
See Also
For conceptual overview of execution flow, see the Topology Concepts guide.
BranchExecutor
Manages the execution of different branch patterns in the workflow.
Import
from marsys.coordination.execution import BranchExecutor
Constructor
BranchExecutor(validation_processor: ValidationProcessor,router: Router,step_executor: StepExecutor,context_manager: ContextManager,config: ExecutionConfig)
execute_branch
async def execute_branch(branch: ExecutionBranch,initial_request: Any,context: Dict[str, Any],resume_with_results: Optional[Dict] = None) -> BranchResult
Parameters
| Parameter | Type | Description |
|---|---|---|
| branch | ExecutionBranch | Branch to execute |
| initial_request | Any | Initial task/prompt |
| context | Dict[str, Any] | Execution context |
| resume_with_results | Optional[Dict] | Child branch results for resumption |
Example
executor = BranchExecutor(validation_processor=validator,router=router,step_executor=step_exec,context_manager=ctx_mgr,config=exec_config)result = await executor.execute_branch(branch=main_branch,initial_request="Analyze this data",context={"session_id": "123"})
ExecutionBranch
Represents a branch of execution in the workflow.
Import
from marsys.coordination.branches.types import ExecutionBranch, BranchType, BranchStatus
Attributes
| Attribute | Type | Description |
|---|---|---|
| branch_id | str | Unique branch identifier |
| branch_type | BranchType | Type of branch |
| status | BranchStatus | Current status |
| parent_branch_id | Optional[str] | Parent branch if nested |
| agent_sequence | List[str] | Agents to execute |
| current_step | int | Current execution step |
| memory | Dict[str, List] | Per-agent memory |
BranchType Enum
class BranchType(Enum):SIMPLE = "simple" # Sequential executionCONVERSATION = "conversation" # Bidirectional dialogueNESTED = "nested" # Has child branchesUSER_INTERACTION = "user_interaction" # Human-in-the-loop
BranchStatus Enum
class BranchStatus(Enum):PENDING = "pending" # Not startedRUNNING = "running" # Currently executingWAITING = "waiting" # Waiting for child branchesCOMPLETED = "completed" # Successfully finishedFAILED = "failed" # Terminated with error
StepExecutor
Executes individual steps within a branch.
Import
from marsys.coordination.execution import StepExecutor
execute_step
async def execute_step(agent_name: str,request: Any,context: Dict[str, Any],branch: ExecutionBranch,memory: List[Message]) -> StepResult
StepResult
@dataclassclass StepResult:success: boolagent_name: strresponse: Anyerror: Optional[str] = Nonemetadata: Dict[str, Any] = field(default_factory=dict)execution_time: float = 0.0memory_used: List[Message] = field(default_factory=list)
DynamicBranchSpawner
Handles runtime creation of parallel branches.
Import
from marsys.coordination.execution import DynamicBranchSpawner
handle_parallel_invocation
async def handle_parallel_invocation(agents: List[str],requests: Dict[str, Any],parent_branch: ExecutionBranch,context: Dict[str, Any]) -> List[asyncio.Task]
Example
spawner = DynamicBranchSpawner(branch_executor=branch_exec,context_manager=ctx_mgr,config=exec_config)# Spawn parallel branchestasks = await spawner.handle_parallel_invocation(agents=["Worker1", "Worker2", "Worker3"],requests={"Worker1": "Task 1","Worker2": "Task 2","Worker3": "Task 3"},parent_branch=main_branch,context={"parallel": True})# Wait for completionresults = await asyncio.gather(*tasks)
BranchResult
Result of branch execution.
Attributes
| Attribute | Type | Description |
|---|---|---|
| branch_id | str | Branch identifier |
| status | BranchStatus | Final status |
| final_agent | Optional[str] | Last agent executed |
| final_response | Any | Final output |
| steps_executed | int | Number of steps |
| execution_time | float | Total time in seconds |
| error | Optional[str] | Error if failed |
Execution Flow
Sequential Execution
# Simple sequential branchbranch = ExecutionBranch(branch_type=BranchType.SIMPLE,agent_sequence=["Agent1", "Agent2", "Agent3"])result = await executor.execute_branch(branch=branch,initial_request="Process this",context={})
Conversation Execution
# Bidirectional conversationbranch = ExecutionBranch(branch_type=BranchType.CONVERSATION,agent_sequence=["Agent1", "Agent2"],metadata={"max_turns": 5})result = await executor.execute_branch(branch=branch,initial_request="Let's discuss",context={})
Parallel Execution
# Response that triggers parallel executionresponse = {"next_action": "parallel_invoke","agents": ["Worker1", "Worker2", "Worker3"],"agent_requests": {"Worker1": "Analyze segment A","Worker2": "Analyze segment B","Worker3": "Analyze segment C"}}# DynamicBranchSpawner handles this automatically
Best Practices
Do
- Set appropriate timeouts for branches
- Handle errors gracefully
- Use branch metadata for tracking
- Monitor memory usage in long conversations
Don't
- Create infinite loops without limits
- Skip error handling
- Ignore memory management
- Forget to set convergence timeouts
Pro Tip
Use BranchType.CONVERSATION for agent dialogues and BranchType.NESTED for dynamic parallelism. The executor handles the complexity automatically.