Topology System
Master the powerful topology system that defines how agents interact, communicate, and collaborate in complex workflows.
Overview
The topology system is the heart of MARSYS's orchestration capabilities. It defines:
- Agent Relationships: Who can communicate with whom
- Execution Flow: Sequential, parallel, or mixed patterns
- Permission Management: Control agent invocation rights
- Convergence Points: Where parallel branches merge
- Rules and Constraints: Execution policies and limits
Architecture
The topology system is composed of several interconnected components:
- Topology: The top-level container holding nodes, edges, and rules
- Nodes: Agents and system components participating in the workflow
- Edges: Communication paths between nodes
- Rules: Execution constraints (timeouts, step limits, etc.)
The TopologyAnalyzer provides graph analysis and validation via TopologyGraph and TopologyValidator. Pre-built patterns (Hub-Spoke, Pipeline, Mesh, Hierarchical) feed into the topology for quick setup of common structures.
Four Ways to Define Multi-Agent Systems
MARSYS provides four different approaches to define how agents interact, from simple to sophisticated. We'll demonstrate each approach using the same example: a Researcher agent that gathers information and passes it to a Writer agent.
Perfect for quick prototyping and simple agent interactions. The topology is automatically created from the allowed_peers configuration:
from marsys.agents import Agentfrom marsys.models import ModelConfig# Configure the modelmodel_config = ModelConfig(type="api",name="anthropic/claude-opus-4.6",provider="openrouter")# Create agents with allowed_peersresearcher = Agent(model_config=model_config,name="Researcher",goal="Expert at finding and analyzing information",instruction="You are a research specialist. Find and analyze information thoroughly.",allowed_peers=["Writer"] # Can invoke Writer)writer = Agent(model_config=model_config,name="Writer",goal="Skilled at creating clear, engaging content",instruction="You are a skilled writer. Create clear, engaging content based on research.",allowed_peers=[] # Cannot invoke other agents)# Run with automatic topology creationresult = await researcher.auto_run(task="Research AI trends and write a report",max_steps=20,verbosity=1)
Key Features:
- Topology is auto-generated from
allowed_peers - No need to explicitly define nodes and edges
- Great for simple workflows and testing
- Supports user interaction when "User" is in
allowed_peers
Comparison: Which Approach to Use?
| Method | Best For | Complexity | Key Features |
|---|---|---|---|
| Way 1: allowed_peers + auto_run | Quick prototyping, simple flows | Simplest | Auto-topology, minimal setup, great for testing |
| Way 2: String notation | Clear visual flows, medium complexity | Moderate | Easy to read, supports bidirectional edges |
| Way 3: Object-based | Type-safe production systems | Complex | Full control, metadata support, type checking |
| Way 4: PatternConfig | Common team structures | Moderate | Pre-tested patterns, quick setup for standard workflows |
Decision Guide
Start with Way 1 if you're:
- Prototyping or testing
- Building simple agent chains
- Want minimal boilerplate
Use Way 2 if you:
- Need clear, readable topology definitions
- Have moderate complexity (5-10 agents)
- Want to visualize agent relationships easily
Choose Way 3 if you:
- Building production systems
- Need type safety and validation
- Want full control over metadata and edge properties
Select Way 4 if you:
- Have a standard pattern (hub-spoke, pipeline, etc.)
- Want battle-tested configurations
- Need to implement common team structures quickly
Available Patterns
Hub-and-Spoke
Central coordinator with satellite agents. The hub communicates bidirectionally with each spoke agent (Hub <--> Spoke1, Hub <--> Spoke2, Hub <--> Spoke3).
Use Cases: Research teams, customer support, data aggregation
Pipeline
Sequential stages with optional parallelism. Data flows linearly: Stage 1 --> Stage 2 --> Stage 3 --> Stage 4.
Use Cases: Data processing, content creation, ETL workflows
Mesh
Fully connected network where every agent can communicate with every other agent. All agents have bidirectional edges to all other agents.
Use Cases: Collaborative problem solving, consensus building
Hierarchical
Tree-based delegation structure. A Manager delegates to Leads (Lead1, Lead2), who in turn delegate to Workers (Worker1, Worker2 under Lead1; Worker3, Worker4 under Lead2).
Use Cases: Organization simulation, cascading tasks
Advanced Features
Dynamic Topology Mutation
Modify topologies at runtime:
# Start with basic topologytopology = Topology(nodes=["Coordinator", "Worker1"],edges=["Coordinator -> Worker1"])# Add nodes dynamicallytopology.add_node("Worker2")topology.add_node(Node("Analyzer", node_type=NodeType.AGENT))# Add edgestopology.add_edge("Coordinator", "Worker2")topology.add_edge(Edge("Worker2", "Analyzer", bidirectional=True))# Add rulestopology.add_rule(TimeoutRule(300))topology.add_rule("max_steps(100)")# Remove componentstopology.remove_node("Worker1")topology.remove_edge("Coordinator", "Worker1")
Convergence Points
Define where parallel branches merge:
# Manual convergence pointtopology = Topology(nodes=[Node("Splitter", node_type=NodeType.AGENT),Node("Worker1", node_type=NodeType.AGENT),Node("Worker2", node_type=NodeType.AGENT),Node("Worker3", node_type=NodeType.AGENT),Node("Aggregator",node_type=NodeType.AGENT,is_convergence_point=True) # Convergence point],edges=["Splitter -> Worker1","Splitter -> Worker2","Splitter -> Worker3","Worker1 -> Aggregator","Worker2 -> Aggregator","Worker3 -> Aggregator"])# Automatic detectionconfig = ExecutionConfig(auto_detect_convergence=True, # Auto-detect from topologydynamic_convergence_enabled=True, # Runtime convergenceconvergence_timeout=300.0 # Max wait time)
Edge Patterns
Special edge behaviors:
from marsys.coordination.topology import EdgePattern# Alternating conversationEdge(source="Negotiator1",target="Negotiator2",bidirectional=True,pattern=EdgePattern.ALTERNATING, # Strict turn-takingmetadata={"max_turns": 5})# Symmetric communicationEdge(source="Peer1",target="Peer2",bidirectional=True,pattern=EdgePattern.SYMMETRIC, # Equal communication rights)# Conditional edgeEdge(source="Checker",target="Escalator",metadata={"condition": "error_rate > 0.1", # Only if condition met"priority": "high"})
Rules System
Control execution behavior:
from marsys.coordination.rules import (Rule, RuleType, RuleResult, RuleContext,TimeoutRule, MaxAgentsRule, MaxStepsRule,MemoryLimitRule, ConditionalRule)# Built-in rulesrules = [TimeoutRule(max_duration_seconds=600),MaxAgentsRule(max_agents=20),MaxStepsRule(max_steps=100),MemoryLimitRule(max_memory_mb=2048),ConditionalRule(condition=lambda ctx: ctx.error_count < 3,action="continue")]# Custom ruleclass BusinessHoursRule(Rule):def __init__(self):super().__init__(name="business_hours",rule_type=RuleType.PRE_EXECUTION,priority=10)async def check(self, context: RuleContext) -> RuleResult:from datetime import datetimehour = datetime.now().hourif 9 <= hour < 17: # Business hoursreturn RuleResult(rule_name=self.name,passed=True,action="allow")else:return RuleResult(rule_name=self.name,passed=False,action="defer",reason="Outside business hours",metadata={"retry_at": "09:00"})# Use custom ruletopology.add_rule(BusinessHoursRule())
Topology Analysis
The framework provides powerful analysis tools:
from marsys.coordination.topology import TopologyAnalyzeranalyzer = TopologyAnalyzer(topology)# Find entry points (nodes with no incoming edges)entry_points = analyzer.get_entry_points()print(f"Entry points: {entry_points}") # e.g., ["User", "Scheduler"]# Find convergence pointsconvergence_points = analyzer.get_convergence_points()print(f"Convergence: {convergence_points}") # e.g., ["Aggregator"]# Check if conversation pattern existshas_conversation = analyzer.has_conversation_pattern()print(f"Has conversation: {has_conversation}")# Get agent permissionspermissions = analyzer.get_agent_permissions("Coordinator")print(f"Coordinator can invoke: {permissions}") # e.g., ["Worker1", "Worker2"]# Validate topologyis_valid, errors = analyzer.validate()if not is_valid:print(f"Topology errors: {errors}")# Get execution order (topological sort)order = analyzer.get_execution_order()print(f"Execution order: {order}")# Detect cycleshas_cycles = analyzer.has_cycles()print(f"Has cycles: {has_cycles}")# Get shortest pathpath = analyzer.get_shortest_path("User", "Reporter")print(f"Shortest path: {path}")
Best Practices
1. Start Simple
Begin with basic patterns and add complexity as needed:
# Start with thistopology = PatternConfig.hub_and_spoke("Coordinator", ["Worker1", "Worker2"])# Evolve to thistopology.add_node("Analyzer")topology.add_edge("Worker1", "Analyzer")
2. Use Convergence Points
Always define clear convergence for parallel work:
Node("Aggregator", is_convergence_point=True)
3. Set Appropriate Timeouts
Different timeouts for different scenarios:
rules = [TimeoutRule(60), # Quick taskTimeoutRule(3600), # Long research]
4. Validate Before Execution
Always validate topology before running:
analyzer = TopologyAnalyzer(topology)is_valid, errors = analyzer.validate()assert is_valid, f"Invalid topology: {errors}"
5. Document Intent
Use metadata to document topology purpose:
topology = Topology(nodes=[...],edges=[...],metadata={"purpose": "Customer support escalation","version": "2.0","author": "Team Lead"})
Common Patterns
Research Team
topology = PatternConfig.hub_and_spoke(hub="LeadResearcher",spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],parallel_spokes=True)
Customer Support
topology = PatternConfig.hierarchical(tree={"Dispatcher": ["L1Support"],"L1Support": ["L2Support"],"L2Support": ["Engineering", "Management"]})
Data Pipeline
topology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["Scraper"]},{"name": "transform", "agents": ["Parser", "Cleaner"]},{"name": "load", "agents": ["Database"]}],parallel_within_stage=True)
Consensus Building
topology = PatternConfig.mesh(agents=["Expert1", "Expert2", "Expert3", "Moderator"],fully_connected=True)
Dynamic Behavior
Runtime Parallel Invocation
Agents can spawn parallel branches dynamically:
# In agent response{"next_action": "parallel_invoke","agents": ["Analyst1", "Analyst2", "Analyst3"],"agent_requests": {"Analyst1": "Analyze financial data","Analyst2": "Analyze market trends","Analyst3": "Analyze competition"}}
Conditional Routing
Route based on conditions:
# In agent response{"next_action": "invoke_agent","action_input": "ErrorHandler" if error else "NextStep"}
Dynamic Agent Discovery
Agents can discover peers at runtime:
# Through contextavailable_agents = context.get("available_agents", [])specialist = next((a for a in available_agents if "expert" in a.lower()), None)
Next Steps
Master topology patterns:
See Examples
Real-world topology implementations
API Reference
Detailed topology API documentation
Agent Development
Build agents that work with topologies
Configuration
Configure execution behavior
Topology Mastered!
You now understand the topology system! Use it to build complex multi-agent workflows with confidence.