Execution API

Complete API reference for the execution system that manages branch execution, step processing, and dynamic parallelism.

See Also

For conceptual overview of execution flow, see the Topology Concepts guide.

BranchExecutor

Manages the execution of different branch patterns in the workflow.

Import

from marsys.coordination.execution import BranchExecutor

Constructor

BranchExecutor(
validation_processor: ValidationProcessor,
router: Router,
step_executor: StepExecutor,
context_manager: ContextManager,
config: ExecutionConfig
)

execute_branch

async def execute_branch(
branch: ExecutionBranch,
initial_request: Any,
context: Dict[str, Any],
resume_with_results: Optional[Dict] = None
) -> BranchResult

Parameters

ParameterTypeDescription
branchExecutionBranchBranch to execute
initial_requestAnyInitial task/prompt
contextDict[str, Any]Execution context
resume_with_resultsOptional[Dict]Child branch results for resumption

Example

executor = BranchExecutor(
validation_processor=validator,
router=router,
step_executor=step_exec,
context_manager=ctx_mgr,
config=exec_config
)
result = await executor.execute_branch(
branch=main_branch,
initial_request="Analyze this data",
context={"session_id": "123"}
)

ExecutionBranch

Represents a branch of execution in the workflow.

Import

from marsys.coordination.branches.types import ExecutionBranch, BranchType, BranchStatus

Attributes

AttributeTypeDescription
branch_idstrUnique branch identifier
branch_typeBranchTypeType of branch
statusBranchStatusCurrent status
parent_branch_idOptional[str]Parent branch if nested
agent_sequenceList[str]Agents to execute
current_stepintCurrent execution step
memoryDict[str, List]Per-agent memory

BranchType Enum

class BranchType(Enum):
SIMPLE = "simple" # Sequential execution
CONVERSATION = "conversation" # Bidirectional dialogue
NESTED = "nested" # Has child branches
USER_INTERACTION = "user_interaction" # Human-in-the-loop

BranchStatus Enum

class BranchStatus(Enum):
PENDING = "pending" # Not started
RUNNING = "running" # Currently executing
WAITING = "waiting" # Waiting for child branches
COMPLETED = "completed" # Successfully finished
FAILED = "failed" # Terminated with error

StepExecutor

Executes individual steps within a branch.

Import

from marsys.coordination.execution import StepExecutor

execute_step

async def execute_step(
agent_name: str,
request: Any,
context: Dict[str, Any],
branch: ExecutionBranch,
memory: List[Message]
) -> StepResult

StepResult

@dataclass
class StepResult:
success: bool
agent_name: str
response: Any
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
execution_time: float = 0.0
memory_used: List[Message] = field(default_factory=list)

DynamicBranchSpawner

Handles runtime creation of parallel branches.

Import

from marsys.coordination.execution import DynamicBranchSpawner

handle_parallel_invocation

async def handle_parallel_invocation(
agents: List[str],
requests: Dict[str, Any],
parent_branch: ExecutionBranch,
context: Dict[str, Any]
) -> List[asyncio.Task]

Example

spawner = DynamicBranchSpawner(
branch_executor=branch_exec,
context_manager=ctx_mgr,
config=exec_config
)
# Spawn parallel branches
tasks = await spawner.handle_parallel_invocation(
agents=["Worker1", "Worker2", "Worker3"],
requests={
"Worker1": "Task 1",
"Worker2": "Task 2",
"Worker3": "Task 3"
},
parent_branch=main_branch,
context={"parallel": True}
)
# Wait for completion
results = await asyncio.gather(*tasks)

BranchResult

Result of branch execution.

Attributes

AttributeTypeDescription
branch_idstrBranch identifier
statusBranchStatusFinal status
final_agentOptional[str]Last agent executed
final_responseAnyFinal output
steps_executedintNumber of steps
execution_timefloatTotal time in seconds
errorOptional[str]Error if failed

Execution Flow

Sequential Execution

# Simple sequential branch
branch = ExecutionBranch(
branch_type=BranchType.SIMPLE,
agent_sequence=["Agent1", "Agent2", "Agent3"]
)
result = await executor.execute_branch(
branch=branch,
initial_request="Process this",
context={}
)

Conversation Execution

# Bidirectional conversation
branch = ExecutionBranch(
branch_type=BranchType.CONVERSATION,
agent_sequence=["Agent1", "Agent2"],
metadata={"max_turns": 5}
)
result = await executor.execute_branch(
branch=branch,
initial_request="Let's discuss",
context={}
)

Parallel Execution

# Response that triggers parallel execution
response = {
"next_action": "parallel_invoke",
"agents": ["Worker1", "Worker2", "Worker3"],
"agent_requests": {
"Worker1": "Analyze segment A",
"Worker2": "Analyze segment B",
"Worker3": "Analyze segment C"
}
}
# DynamicBranchSpawner handles this automatically

Best Practices

Do

  • Set appropriate timeouts for branches
  • Handle errors gracefully
  • Use branch metadata for tracking
  • Monitor memory usage in long conversations

Don't

  • Create infinite loops without limits
  • Skip error handling
  • Ignore memory management
  • Forget to set convergence timeouts

Pro Tip

Use BranchType.CONVERSATION for agent dialogues and BranchType.NESTED for dynamic parallelism. The executor handles the complexity automatically.