Execution API Reference

Complete API reference for the execution system that manages branch execution, step processing, and dynamic parallelism.

The Execution API handles the runtime execution of multi-agent workflows, including branch management, step execution, and dynamic parallel invocation.

BranchExecutor

Manages the execution of different branch patterns in the workflow.

Import

from marsys.coordination.execution import BranchExecutor

Constructor

BranchExecutor(
validation_processor: ValidationProcessor,
router: Router,
step_executor: StepExecutor,
context_manager: ContextManager,
config: ExecutionConfig
)

execute_branch

async def execute_branch(
branch: ExecutionBranch,
initial_request: Any,
context: Dict[str, Any],
resume_with_results: Optional[Dict] = None
) -> BranchResult

Parameters

ParameterTypeDescriptionDefault
branchExecutionBranchBranch to executeRequired
initial_requestAnyInitial task/promptRequired
contextDict[str, Any]Execution contextRequired
resume_with_resultsOptional[Dict]Child branch resultsNone

Returns: BranchResult with execution outcome

Example

executor = BranchExecutor(
validation_processor=validator,
router=router,
step_executor=step_exec,
context_manager=ctx_mgr,
config=exec_config
)
result = await executor.execute_branch(
branch=main_branch,
initial_request="Analyze this data",
context={"session_id": "123"}
)

ExecutionBranch

Represents a branch of execution in the workflow.

Import

from marsys.coordination.branches.types import ExecutionBranch, BranchType, BranchStatus

Attributes

AttributeTypeDescription
branch_idstrUnique branch identifier
branch_typeBranchTypeType of branch
statusBranchStatusCurrent status
parent_branch_idOptional[str]Parent branch if nested
agent_sequenceList[str]Agents to execute
current_stepintCurrent execution step
memoryDict[str, List]Per-agent memory
metadataDict[str, Any]Branch metadata

BranchType Enum

class BranchType(Enum):
SIMPLE = "simple" # Sequential execution
CONVERSATION = "conversation" # Bidirectional dialogue
NESTED = "nested" # Has child branches
USER_INTERACTION = "user_interaction" # Human-in-the-loop

BranchStatus Enum

class BranchStatus(Enum):
PENDING = "pending" # Not started
RUNNING = "running" # Currently executing
WAITING = "waiting" # Waiting for child branches
COMPLETED = "completed" # Successfully finished
FAILED = "failed" # Terminated with error

Example

from marsys.coordination.branches.types import ExecutionBranch, BranchType
branch = ExecutionBranch(
branch_id="main_001",
branch_type=BranchType.SIMPLE,
agent_sequence=["Coordinator", "Worker"],
metadata={"priority": "high"}
)

StepExecutor

Executes individual steps within a branch.

Import

from marsys.coordination.execution import StepExecutor

Constructor

StepExecutor(
config: Optional[ExecutionConfig] = None,
tool_executor: Optional[ToolExecutor] = None,
user_node_handler: Optional[UserNodeHandler] = None,
event_bus: Optional[EventBus] = None,
system_prompt_builder: Optional[SystemPromptBuilder] = None,
max_retries: int = 5,
retry_delay: float = 1.0
)

Parameters

ParameterTypeDescriptionDefault
configOptional[ExecutionConfig]Execution configurationNone
tool_executorOptional[ToolExecutor]Tool execution handlerNone
user_node_handlerOptional[UserNodeHandler]User interaction handlerNone
event_busOptional[EventBus]Event bus for status updatesNone
system_prompt_builderOptional[SystemPromptBuilder]Builds agent system prompts using response formatNone
max_retriesintMaximum retry attempts5
retry_delayfloatDelay between retries (seconds)1.0

execute_step

async def execute_step(
agent: Union[BaseAgent, str],
request: Any,
memory: List[Dict[str, Any]],
context: Dict[str, Any]
) -> StepResult

StepResult

@dataclass
class StepResult:
"""Result of executing a single step within a branch."""
# Core execution fields (set by StepExecutor)
agent_name: str
success: bool
response: Any = None
# Execution metadata (set by StepExecutor)
step_id: Optional[str] = None
memory_updates: List[Dict[str, Any]] = field(default_factory=list)
tool_calls: List[Dict[str, Any]] = field(default_factory=list)
tool_results: List[Dict[str, Any]] = field(default_factory=list)
error: Optional[str] = None
requires_retry: bool = False
context_selection: Optional[Dict[str, Any]] = None
metadata: Dict[str, Any] = field(default_factory=dict)
# Routing decision fields (set by BranchExecutor ONLY)
action_type: Optional[str] = None
parsed_response: Optional[Dict[str, Any]] = None
next_agent: Optional[str] = None
should_end_branch: bool = False
waiting_for_children: bool = False
child_branch_ids: List[str] = field(default_factory=list)
convergence_target: Optional[str] = None

Example

from marsys.coordination.formats import SystemPromptBuilder
step_executor = StepExecutor(
config=exec_config,
system_prompt_builder=SystemPromptBuilder(response_format="json")
)
result = await step_executor.execute_step(
agent="Analyzer",
request="Analyze sales data",
memory=conversation_memory,
context={"session": "123"}
)

DynamicBranchSpawner

Handles runtime creation of parallel branches.

Import

from marsys.coordination.execution import DynamicBranchSpawner

Constructor

DynamicBranchSpawner(
branch_executor: BranchExecutor,
context_manager: ContextManager,
config: ExecutionConfig
)

handle_parallel_invocation

async def handle_parallel_invocation(
agents: List[str],
requests: Dict[str, Any],
parent_branch: ExecutionBranch,
context: Dict[str, Any]
) -> List[asyncio.Task]
ParameterTypeDescription
agentsList[str]Agents to invoke in parallel
requestsDict[str, Any]Per-agent requests
parent_branchExecutionBranchParent branch
contextDict[str, Any]Execution context

Returns: List of async tasks for parallel execution

Example

spawner = DynamicBranchSpawner(
branch_executor=branch_exec,
context_manager=ctx_mgr,
config=exec_config
)
# Spawn parallel branches
tasks = await spawner.handle_parallel_invocation(
agents=["Worker1", "Worker2", "Worker3"],
requests={
"Worker1": "Task 1",
"Worker2": "Task 2",
"Worker3": "Task 3"
},
parent_branch=main_branch,
context={"parallel": True}
)
# Wait for completion
results = await asyncio.gather(*tasks)

BranchResult

Result of branch execution.

Import

from marsys.coordination.branches.types import BranchResult

Attributes

AttributeTypeDescription
branch_idstrBranch identifier
statusBranchStatusFinal status
final_agentOptional[str]Last agent executed
final_responseAnyFinal output
steps_executedintNumber of steps
execution_timefloatTotal time in seconds
metadataDict[str, Any]Result metadata
errorOptional[str]Error if failed

Example

if result.status == BranchStatus.COMPLETED:
print(f"Success: {result.final_response}")
print(f"Executed {result.steps_executed} steps")
print(f"Time: {result.execution_time:.2f}s")
else:
print(f"Failed: {result.error}")

ToolExecutor

Executes tool calls within agent steps.

Import

from marsys.coordination.execution import ToolExecutor

execute_tool_calls

async def execute_tool_calls(
tool_calls: List[ToolCall],
agent_name: str,
context: Dict[str, Any]
) -> List[ToolResult]
ParameterTypeDescription
tool_callsList[ToolCall]Tools to execute
agent_namestrAgent making calls
contextDict[str, Any]Execution context

Agent Invocation Note

execute_tool_calls is for tools only. If a name matches a peer agent, it must be invoked through agent action JSON (next_action="invoke_agent"), not through tool_calls.

Example

tool_executor = ToolExecutor(
agent_registry=AgentRegistry,
config=exec_config
)
results = await tool_executor.execute_tool_calls(
tool_calls=[
ToolCall(name="search", arguments={"query": "AI news"}),
ToolCall(name="summarize", arguments={"text": "..."})
],
agent_name="Researcher",
context={}
)

Execution Flow

Sequential Execution

# Simple sequential branch
branch = ExecutionBranch(
branch_type=BranchType.SIMPLE,
agent_sequence=["Agent1", "Agent2", "Agent3"]
)
result = await executor.execute_branch(
branch=branch,
initial_request="Process this",
context={}
)

Conversation Execution

# Bidirectional conversation
branch = ExecutionBranch(
branch_type=BranchType.CONVERSATION,
agent_sequence=["Agent1", "Agent2"],
metadata={"max_turns": 5}
)
result = await executor.execute_branch(
branch=branch,
initial_request="Let's discuss",
context={}
)

Nested Execution

# Branch with child branches
parent_branch = ExecutionBranch(
branch_type=BranchType.NESTED,
agent_sequence=["Coordinator"]
)
# Coordinator spawns child branches dynamically
result = await executor.execute_branch(
branch=parent_branch,
initial_request="Coordinate tasks",
context={"allow_parallel": True}
)

Parallel Execution

Creating Parallel Branches

# Response that triggers parallel execution
response = {
"next_action": "parallel_invoke",
"agents": ["Worker1", "Worker2", "Worker3"],
"agent_requests": {
"Worker1": "Analyze segment A",
"Worker2": "Analyze segment B",
"Worker3": "Analyze segment C"
}
}
# DynamicBranchSpawner handles this automatically

Convergence Points

# Wait for all parallel branches
results = await spawner.wait_for_convergence(
child_branch_ids=["branch_1", "branch_2", "branch_3"],
timeout=300.0 # 5 minutes
)
# Resume parent with aggregated results
parent_result = await executor.execute_branch(
branch=parent_branch,
initial_request=None,
context=context,
resume_with_results=results
)

Best Practices

Do

  • Set appropriate timeouts for branches
  • Handle errors gracefully
  • Use branch metadata for tracking
  • Monitor memory usage in long conversations
  • Implement convergence points for parallel execution

Don't

  • Create infinite loops without limits
  • Skip error handling
  • Ignore memory management
  • Forget to set convergence timeouts

Pro Tip

Use BranchType.CONVERSATION for agent dialogues and BranchType.NESTED for dynamic parallelism. The executor handles the complexity automatically.