Orchestra API

The high-level coordination API that orchestrates multi-agent workflows.

Import

from marsys.coordination import Orchestra
from marsys.coordination.orchestra import OrchestraResult

Quick Start

One-Line Execution

result = await Orchestra.run(
task="Research AI trends and write a report",
topology=topology
)

With Configuration

from marsys.coordination.config import ExecutionConfig, StatusConfig
result = await Orchestra.run(
task="Complex research task",
topology=topology,
execution_config=ExecutionConfig(
convergence_timeout=300.0,
status=StatusConfig.from_verbosity(1)
),
max_steps=50
)

Orchestra.run()

Main entry point for one-line execution:

@classmethod
async def run(
cls,
task: Union[str, Dict[str, Any]],
topology: Union[Dict, Topology, PatternConfig],
agent_registry: Optional[AgentRegistry] = None,
context: Optional[Dict[str, Any]] = None,
execution_config: Optional[ExecutionConfig] = None,
state_manager: Optional[StateManager] = None,
max_steps: int = 100,
allow_follow_ups: bool = False,
**kwargs
) -> OrchestraResult

Parameters

ParameterTypeDefaultDescription
taskUnion[str, Dict]RequiredTask description or structured request
topologyUnion[Dict, Topology, PatternConfig]RequiredAgent interaction topology
execution_configOptional[ExecutionConfig]NoneExecution configuration
state_managerOptional[StateManager]NoneState persistence manager
max_stepsint100Maximum execution steps

Task Formats

Text Task

task = "Research the latest AI developments"

Structured Task

task = {
"request": "Analyze market trends",
"sectors": ["tech", "finance"]
}

Multimodal Task

task = {
"content": "What is shown in these images?",
"images": [
"/path/to/image1.png",
"/path/to/image2.jpg"
]
}

OrchestraResult

Result object returned by Orchestra:

@dataclass
class OrchestraResult:
success: bool
final_response: Any
branch_results: List[BranchResult]
total_steps: int
total_duration: float
metadata: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None

Helper Methods

# Get specific branch result by ID
branch = result.get_branch_by_id("branch_123")
# Get all successful branches
successful = result.get_successful_branches()
# Get final response as formatted text
text = result.get_final_response_as_text()
# Check if response is structured data
is_structured = result.is_structured_response()

Example Usage

result = await Orchestra.run(task, topology)
if result.success:
print(f"Success in {result.total_duration:.2f}s")
print(f"Response: {result.get_final_response_as_text()}")
for branch in result.branch_results:
print(f"Branch {branch.branch_id}: {branch.status}")
else:
print(f"Failed: {result.error}")

Configuration

from marsys.coordination.config import ExecutionConfig, StatusConfig
config = ExecutionConfig(
convergence_timeout=300.0, # Overall timeout
branch_timeout=600.0, # Per-branch timeout
step_timeout=120.0, # Per-step timeout
dynamic_convergence_enabled=True,
status=StatusConfig.from_verbosity(1) # 0=quiet, 1=normal, 2=verbose
)

See Also

Topology concepts for more on defining agent workflows.