Topology System
Master the powerful topology system that defines how agents interact, communicate, and collaborate in complex workflows.
Overview
The topology system is the heart of MARSYS's orchestration capabilities. It defines:
- Agent Relationships: Who can communicate with whom
- Execution Flow: Sequential, parallel, or mixed patterns
- Permission Management: Control agent invocation rights
- Convergence Points: Where parallel branches merge
- Rules and Constraints: Execution policies and limits
Architecture
The topology system is composed of several interconnected components:
- Topology: The top-level container holding nodes, edges, and rules
- Nodes: Agents and system components participating in the workflow
- Edges: Communication paths between nodes
- Rules: Execution constraints (timeouts, step limits, etc.)
- TopologyAnalyzer: Analysis tools for validation and inspection
Pre-built patterns (Hub-Spoke, Pipeline, Mesh, Hierarchical) feed into the topology, while the analyzer provides graph analysis and validation capabilities.
Four Ways to Define Multi-Agent Systems
MARSYS provides four different approaches to define how agents interact, from simple to sophisticated. Each example below demonstrates the same workflow: a Researcher agent that gathers information and passes it to a Writer agent.
Perfect for quick prototyping and simple agent interactions. The topology is automatically created from the allowed_peers configuration:
from marsys.agents import Agentfrom marsys.models import ModelConfig# Configure the modelmodel_config = ModelConfig(type="api",name="anthropic/claude-opus-4.6",provider="openrouter")# Create agents with allowed_peersresearcher = Agent(model_config=model_config,name="Researcher",goal="Expert at finding and analyzing information",instruction="You are a research specialist. Find and analyze information thoroughly.",allowed_peers=["Writer"] # Can invoke Writer)writer = Agent(model_config=model_config,name="Writer",goal="Skilled at creating clear, engaging content",instruction="You are a skilled writer. Create clear, engaging content based on research.",allowed_peers=[] # Cannot invoke other agents)# Run with automatic topology creationresult = await researcher.auto_run(task="Research AI trends and write a report",max_steps=20,verbosity=1)
Key Features:
- Topology is auto-generated from
allowed_peers - No need to explicitly define nodes and edges
- Great for simple workflows and testing
- Supports user interaction when "User" is in
allowed_peers
Comparison: Which Approach to Use?
| Method | Best For | Complexity | Key Features |
|---|---|---|---|
| allowed_peers + auto_run | Quick prototyping, simple flows | Simplest | Auto-topology, minimal setup, great for testing |
| String notation | Clear visual flows, medium complexity | Moderate | Easy to read, supports bidirectional edges |
| Object-based | Type-safe production systems | Complex | Full control, metadata support, type checking |
| PatternConfig | Common team structures | Moderate | Pre-tested patterns, quick setup for standard workflows |
Decision Guide
- Start with Peer-Based if you're prototyping or testing, building simple agent chains, or want minimal boilerplate
- Use String Notation if you need clear, readable topology definitions or have moderate complexity (5-10 agents)
- Choose Object-Based if you're building production systems, need type safety and validation, or want full control over metadata and edge properties
- Select PatternConfig if you have a standard pattern (hub-spoke, pipeline, etc.) or want battle-tested configurations
Available Patterns
Hub-and-Spoke
Central coordinator with satellite agents. The hub communicates bidirectionally with each spoke.
Use Cases: Research teams, customer support, data aggregation
topology = PatternConfig.hub_and_spoke(hub="LeadResearcher",spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],parallel_spokes=True)
Pipeline
Sequential stages with optional parallelism within each stage. Data flows from Stage 1 through to the final stage.
Use Cases: Data processing, content creation, ETL workflows
topology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["Scraper"]},{"name": "transform", "agents": ["Parser", "Cleaner"]},{"name": "load", "agents": ["Database"]}],parallel_within_stage=True)
Mesh
Fully connected network where every agent can communicate with every other agent.
Use Cases: Collaborative problem solving, consensus building
topology = PatternConfig.mesh(agents=["Expert1", "Expert2", "Expert3", "Moderator"],fully_connected=True)
Hierarchical
Tree-based delegation where a manager delegates to leads, who in turn delegate to workers.
Use Cases: Organization simulation, cascading tasks
topology = PatternConfig.hierarchical(tree={"Dispatcher": ["L1Support"],"L1Support": ["L2Support"],"L2Support": ["Engineering", "Management"]})
Advanced Features
Dynamic Topology Mutation
Modify topologies at runtime:
# Start with basic topologytopology = Topology(nodes=["Coordinator", "Worker1"],edges=["Coordinator -> Worker1"])# Add nodes dynamicallytopology.add_node("Worker2")topology.add_node(Node("Analyzer", node_type=NodeType.AGENT))# Add edgestopology.add_edge("Coordinator", "Worker2")topology.add_edge(Edge("Worker2", "Analyzer", bidirectional=True))# Add rulestopology.add_rule(TimeoutRule(300))topology.add_rule("max_steps(100)")# Remove componentstopology.remove_node("Worker1")topology.remove_edge("Coordinator", "Worker1")
Convergence Points
Define where parallel branches merge:
# Manual convergence pointtopology = Topology(nodes=[Node("Splitter", node_type=NodeType.AGENT),Node("Worker1", node_type=NodeType.AGENT),Node("Worker2", node_type=NodeType.AGENT),Node("Worker3", node_type=NodeType.AGENT),Node("Aggregator",node_type=NodeType.AGENT,is_convergence_point=True) # Convergence point],edges=["Splitter -> Worker1","Splitter -> Worker2","Splitter -> Worker3","Worker1 -> Aggregator","Worker2 -> Aggregator","Worker3 -> Aggregator"])# Automatic detectionconfig = ExecutionConfig(auto_detect_convergence=True, # Auto-detect from topologydynamic_convergence_enabled=True, # Runtime convergenceconvergence_timeout=300.0 # Max wait time)
Edge Patterns
Special edge behaviors:
from marsys.coordination.topology import EdgePattern# Alternating conversationEdge(source="Negotiator1",target="Negotiator2",bidirectional=True,pattern=EdgePattern.ALTERNATING, # Strict turn-takingmetadata={"max_turns": 5})# Symmetric communicationEdge(source="Peer1",target="Peer2",bidirectional=True,pattern=EdgePattern.SYMMETRIC, # Equal communication rights)# Conditional edgeEdge(source="Checker",target="Escalator",metadata={"condition": "error_rate > 0.1", # Only if condition met"priority": "high"})
Rules System
Control execution behavior:
from marsys.coordination.rules import (Rule, RuleType, RuleResult, RuleContext,TimeoutRule, MaxAgentsRule, MaxStepsRule,MemoryLimitRule, ConditionalRule)# Built-in rulesrules = [TimeoutRule(max_duration_seconds=600),MaxAgentsRule(max_agents=20),MaxStepsRule(max_steps=100),MemoryLimitRule(max_memory_mb=2048),ConditionalRule(condition=lambda ctx: ctx.error_count < 3,action="continue")]# Custom ruleclass BusinessHoursRule(Rule):def __init__(self):super().__init__(name="business_hours",rule_type=RuleType.PRE_EXECUTION,priority=10)async def check(self, context: RuleContext) -> RuleResult:from datetime import datetimehour = datetime.now().hourif 9 <= hour < 17: # Business hoursreturn RuleResult(rule_name=self.name,passed=True,action="allow")else:return RuleResult(rule_name=self.name,passed=False,action="defer",reason="Outside business hours",metadata={"retry_at": "09:00"})# Use custom ruletopology.add_rule(BusinessHoursRule())
Topology Analysis
The framework provides powerful analysis tools:
from marsys.coordination.topology import TopologyAnalyzeranalyzer = TopologyAnalyzer(topology)# Find entry points (nodes with no incoming edges)entry_points = analyzer.get_entry_points()print(f"Entry points: {entry_points}") # e.g., ["User", "Scheduler"]# Find convergence pointsconvergence_points = analyzer.get_convergence_points()print(f"Convergence: {convergence_points}") # e.g., ["Aggregator"]# Check if conversation pattern existshas_conversation = analyzer.has_conversation_pattern()print(f"Has conversation: {has_conversation}")# Get agent permissionspermissions = analyzer.get_agent_permissions("Coordinator")print(f"Coordinator can invoke: {permissions}") # e.g., ["Worker1", "Worker2"]# Validate topologyis_valid, errors = analyzer.validate()if not is_valid:print(f"Topology errors: {errors}")# Get execution order (topological sort)order = analyzer.get_execution_order()print(f"Execution order: {order}")# Detect cycleshas_cycles = analyzer.has_cycles()print(f"Has cycles: {has_cycles}")# Get shortest pathpath = analyzer.get_shortest_path("User", "Reporter")print(f"Shortest path: {path}")
Best Practices
1. Start Simple
Begin with basic patterns and add complexity as needed:
# Start with thistopology = PatternConfig.hub_and_spoke("Coordinator", ["Worker1", "Worker2"])# Evolve to thistopology.add_node("Analyzer")topology.add_edge("Worker1", "Analyzer")
2. Use Convergence Points
Always define clear convergence for parallel work:
Node("Aggregator", is_convergence_point=True)
3. Set Appropriate Timeouts
Different timeouts for different scenarios:
rules = [TimeoutRule(60), # Quick taskTimeoutRule(3600), # Long research]
4. Validate Before Execution
Always validate topology before running:
analyzer = TopologyAnalyzer(topology)is_valid, errors = analyzer.validate()assert is_valid, f"Invalid topology: {errors}"
5. Document Intent
Use metadata to document topology purpose:
topology = Topology(nodes=[...],edges=[...],metadata={"purpose": "Customer support escalation","version": "2.0","author": "Team Lead"})
Avoid Cycles Without Exit
Ensure your topology has clear termination conditions. Cycles are allowed but should have exit conditions to prevent infinite loops.
Common Patterns
Research Team
topology = PatternConfig.hub_and_spoke(hub="LeadResearcher",spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],parallel_spokes=True)
Customer Support
topology = PatternConfig.hierarchical(tree={"Dispatcher": ["L1Support"],"L1Support": ["L2Support"],"L2Support": ["Engineering", "Management"]})
Data Pipeline
topology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["Scraper"]},{"name": "transform", "agents": ["Parser", "Cleaner"]},{"name": "load", "agents": ["Database"]}],parallel_within_stage=True)
Consensus Building
topology = PatternConfig.mesh(agents=["Expert1", "Expert2", "Expert3", "Moderator"],fully_connected=True)
Dynamic Behavior
Runtime Parallel Invocation
Agents can spawn parallel branches dynamically:
# In agent response{"next_action": "parallel_invoke","agents": ["Analyst1", "Analyst2", "Analyst3"],"agent_requests": {"Analyst1": "Analyze financial data","Analyst2": "Analyze market trends","Analyst3": "Analyze competition"}}
Conditional Routing
Route based on conditions:
# In agent response{"next_action": "invoke_agent","action_input": "ErrorHandler" if error else "NextStep"}
Dynamic Agent Discovery
Agents can discover peers at runtime:
# Through contextavailable_agents = context.get("available_agents", [])specialist = next((a for a in available_agents if "expert" in a.lower()), None)
Next Steps
Agent Development
Build agents that work with topologies
Guides
Hands-on tutorials and walkthroughs
Registry
Agent discovery and lifecycle management
Topology Mastered!
You now understand the topology system! Use it to build complex multi-agent workflows with confidence.