Topology
Topologies define how agents communicate and coordinate in MARSYS - they are graph structures that specify workflow patterns and execution flow.
See Also
For detailed class signatures and method parameters, see the Topology API Reference.
Overview
A topology in MARSYS is a directed graph that defines:
- Nodes: Agents or system components that participate in the workflow
- Edges: Communication paths and relationships between nodes
- Rules: Execution constraints like timeouts and convergence conditions
Defining Topologies
MARSYS offers three ways to define topologies, from simple to complex:
1. Dictionary Format (Simplest)
The quickest way to define a topology using Python dictionaries:
# Simple dictionary topologytopology = {"agents": ["Coordinator", "Worker1", "Worker2"],"flows": ["Coordinator -> Worker1","Coordinator -> Worker2","Worker1 -> Coordinator","Worker2 -> Coordinator"]}# Use with Orchestraresult = await Orchestra.run(task="Analyze this data",topology=topology)
2. Pattern-Based (Recommended)
Use pre-defined patterns for common workflow structures:
from marsys.coordination.topology.patterns import PatternConfig# Hub and spoke pattern - coordinator delegates to workerstopology = PatternConfig.hub_and_spoke(hub="Coordinator",spokes=["Researcher", "Analyst", "Writer"],parallel_spokes=True, # Execute workers in parallelbidirectional=True # Workers can respond back)# Pipeline pattern - sequential processing stagestopology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["DataExtractor"]},{"name": "transform", "agents": ["Transformer1", "Transformer2"]},{"name": "load", "agents": ["DataLoader"]}],parallel_within_stage=True)# Hierarchical pattern - tree structuretopology = PatternConfig.hierarchical(tree={"Director": ["Manager1", "Manager2"],"Manager1": ["Worker1", "Worker2"],"Manager2": ["Worker3"]})
3. Object-Based (Most Control)
Full control with typed objects for complex scenarios:
from marsys.coordination.topology import Topology, Node, Edge, NodeType, EdgeTypetopology = Topology(nodes=[Node("User", node_type=NodeType.USER),Node("Coordinator", node_type=NodeType.AGENT),Node("Worker1", node_type=NodeType.AGENT),Node("Worker2", node_type=NodeType.AGENT),Node("Aggregator",node_type=NodeType.AGENT,is_convergence_point=True)],edges=[Edge("User", "Coordinator"),Edge("Coordinator", "Worker1"),Edge("Coordinator", "Worker2"),Edge("Worker1", "Aggregator"),Edge("Worker2", "Aggregator"),Edge("Aggregator", "User")])
Node Types
Nodes represent participants in the workflow:
from marsys.coordination.topology import Node, NodeType# User node - for human-in-the-loop interactionsuser = Node("User", node_type=NodeType.USER)# Agent node - AI agent that processes tasksagent = Node("Assistant", node_type=NodeType.AGENT)# System node - internal system componentsystem = Node("Logger", node_type=NodeType.SYSTEM)# Tool node - represents a tool/functiontool = Node("SearchTool", node_type=NodeType.TOOL)# Convergence point - where parallel branches mergeaggregator = Node("Aggregator",node_type=NodeType.AGENT,is_convergence_point=True)
Edge Types
Edges define how nodes communicate:
from marsys.coordination.topology import Edge, EdgeType, EdgePattern# Standard invocation - one agent calls anotherinvoke = Edge("Manager", "Worker", edge_type=EdgeType.INVOKE)# Notification - fire and forgetnotify = Edge("Worker", "Logger", edge_type=EdgeType.NOTIFY)# Query/Response - request and wait for responsequery = Edge("Agent", "Database", edge_type=EdgeType.QUERY)# Streaming - continuous data flowstream = Edge("Producer", "Consumer", edge_type=EdgeType.STREAM)# Bidirectional conversationconversation = Edge("Agent1", "Agent2",bidirectional=True,pattern=EdgePattern.ALTERNATING # Take turns)
Common Patterns
Hub and Spoke
A central coordinator delegates tasks to specialized workers:
# Best for: Task delegation, parallel processingtopology = PatternConfig.hub_and_spoke(hub="Coordinator",spokes=["Researcher", "Analyst", "Writer"],parallel_spokes=True)# Flow:# 1. Coordinator receives task# 2. Delegates to Researcher, Analyst, Writer in parallel# 3. Workers return results to Coordinator# 4. Coordinator aggregates and produces final output
Pipeline
Sequential processing through multiple stages:
# Best for: ETL workflows, sequential transformationstopology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["Extractor"]},{"name": "transform", "agents": ["Cleaner", "Enricher"]},{"name": "analyze", "agents": ["Analyzer"]},{"name": "report", "agents": ["Reporter"]}],parallel_within_stage=True)# Flow:# 1. Extractor pulls data# 2. Cleaner and Enricher process in parallel# 3. Analyzer receives transformed data# 4. Reporter generates final output
Mesh
Fully connected network where any agent can communicate with any other:
# Best for: Collaborative problem-solving, debate scenariostopology = PatternConfig.mesh(agents=["Expert1", "Expert2", "Expert3"],fully_connected=True)# All agents can communicate with each other
Choosing the Right Pattern
Hub and Spoke: When you need central coordination and parallel workers.
Pipeline: When tasks must flow through sequential stages.
Mesh: When agents need to collaborate freely.
Hierarchical: When you have organizational/reporting structures.
User Interaction
Include human-in-the-loop by adding User nodes:
# Topology with user interactiontopology = {"agents": ["User", "Assistant", "Reviewer"],"flows": ["User -> Assistant", # User provides task"Assistant -> Reviewer", # Assistant produces draft"Reviewer -> User" # User reviews result]}# Execute with user interaction enabledresult = await Orchestra.run(task="Help me write a report",topology=topology,execution_config=ExecutionConfig(user_interaction="terminal", # Enable terminal inputuser_first=True # Start with user input))
Dynamic Topology Modification
Topologies can be modified programmatically:
from marsys.coordination.topology import Topology, Node, Edge# Create base topologytopology = Topology()# Add nodes dynamicallytopology.add_node(Node("Coordinator"))topology.add_node(Node("Worker1"))# Add edgestopology.add_edge(Edge("Coordinator", "Worker1"))# Add more workers based on workloadfor i in range(2, 5):worker = Node(f"Worker{i}")topology.add_node(worker)topology.add_edge(Edge("Coordinator", f"Worker{i}"))topology.add_edge(Edge(f"Worker{i}", "Coordinator"))# Validate before executionif topology.validate():result = await Orchestra.run(task, topology)
Best Practices
- Start with patterns: Use PatternConfig for common cases before building custom topologies
- Define convergence points: Mark where parallel branches should merge to avoid orphaned results
- Use meaningful names: Node names should reflect their purpose (e.g., "Researcher" not "Agent1")
- Validate before running: Call topology.validate() to catch configuration errors early
- Consider error paths: Include edges for error handling and recovery flows
Avoid Cycles Without Exit
Ensure your topology has clear termination conditions. Cycles are allowed but should have exit conditions to prevent infinite loops.