Orchestra API
The high-level coordination API that orchestrates multi-agent workflows.
Import
from marsys.coordination import Orchestrafrom marsys.coordination.orchestra import OrchestraResult
Quick Start
One-Line Execution
result = await Orchestra.run(task="Research AI trends and write a report",topology=topology)
With Configuration
from marsys.coordination.config import ExecutionConfig, StatusConfigresult = await Orchestra.run(task="Complex research task",topology=topology,execution_config=ExecutionConfig(convergence_timeout=300.0,status=StatusConfig.from_verbosity(1)),max_steps=50)
Orchestra.run()
Main entry point for one-line execution:
@classmethodasync def run(cls,task: Union[str, Dict[str, Any]],topology: Union[Dict, Topology, PatternConfig],agent_registry: Optional[AgentRegistry] = None,context: Optional[Dict[str, Any]] = None,execution_config: Optional[ExecutionConfig] = None,state_manager: Optional[StateManager] = None,max_steps: int = 100,allow_follow_ups: bool = False,**kwargs) -> OrchestraResult
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| task | Union[str, Dict] | Required | Task description or structured request |
| topology | Union[Dict, Topology, PatternConfig] | Required | Agent interaction topology |
| execution_config | Optional[ExecutionConfig] | None | Execution configuration |
| state_manager | Optional[StateManager] | None | State persistence manager |
| max_steps | int | 100 | Maximum execution steps |
Task Formats
Text Task
task = "Research the latest AI developments"
Structured Task
task = {"request": "Analyze market trends","sectors": ["tech", "finance"]}
Multimodal Task
task = {"content": "What is shown in these images?","images": ["/path/to/image1.png","/path/to/image2.jpg"]}
OrchestraResult
Result object returned by Orchestra:
@dataclassclass OrchestraResult:success: boolfinal_response: Anybranch_results: List[BranchResult]total_steps: inttotal_duration: floatmetadata: Dict[str, Any] = field(default_factory=dict)error: Optional[str] = None
Helper Methods
# Get specific branch result by IDbranch = result.get_branch_by_id("branch_123")# Get all successful branchessuccessful = result.get_successful_branches()# Get final response as formatted texttext = result.get_final_response_as_text()# Check if response is structured datais_structured = result.is_structured_response()
Example Usage
result = await Orchestra.run(task, topology)if result.success:print(f"Success in {result.total_duration:.2f}s")print(f"Response: {result.get_final_response_as_text()}")for branch in result.branch_results:print(f"Branch {branch.branch_id}: {branch.status}")else:print(f"Failed: {result.error}")
Configuration
from marsys.coordination.config import ExecutionConfig, StatusConfigconfig = ExecutionConfig(convergence_timeout=300.0, # Overall timeoutbranch_timeout=600.0, # Per-branch timeoutstep_timeout=120.0, # Per-step timeoutdynamic_convergence_enabled=True,status=StatusConfig.from_verbosity(1) # 0=quiet, 1=normal, 2=verbose)
See Also
Topology concepts for more on defining agent workflows.