Topology System

Master the powerful topology system that defines how agents interact, communicate, and collaborate in complex workflows.

Overview

The topology system is the heart of MARSYS's orchestration capabilities. It defines:

  • Agent Relationships: Who can communicate with whom
  • Execution Flow: Sequential, parallel, or mixed patterns
  • Permission Management: Control agent invocation rights
  • Convergence Points: Where parallel branches merge
  • Rules and Constraints: Execution policies and limits

Architecture

The topology system is composed of several interconnected components:

  • Topology: The top-level container holding nodes, edges, and rules
  • Nodes: Agents and system components participating in the workflow
  • Edges: Communication paths between nodes
  • Rules: Execution constraints (timeouts, step limits, etc.)
  • TopologyAnalyzer: Analysis tools for validation and inspection

Pre-built patterns (Hub-Spoke, Pipeline, Mesh, Hierarchical) feed into the topology, while the analyzer provides graph analysis and validation capabilities.

Four Ways to Define Multi-Agent Systems

MARSYS provides four different approaches to define how agents interact, from simple to sophisticated. Each example below demonstrates the same workflow: a Researcher agent that gathers information and passes it to a Writer agent.

Perfect for quick prototyping and simple agent interactions. The topology is automatically created from the allowed_peers configuration:

from marsys.agents import Agent
from marsys.models import ModelConfig
# Configure the model
model_config = ModelConfig(
type="api",
name="anthropic/claude-opus-4.6",
provider="openrouter"
)
# Create agents with allowed_peers
researcher = Agent(
model_config=model_config,
name="Researcher",
goal="Expert at finding and analyzing information",
instruction="You are a research specialist. Find and analyze information thoroughly.",
allowed_peers=["Writer"] # Can invoke Writer
)
writer = Agent(
model_config=model_config,
name="Writer",
goal="Skilled at creating clear, engaging content",
instruction="You are a skilled writer. Create clear, engaging content based on research.",
allowed_peers=[] # Cannot invoke other agents
)
# Run with automatic topology creation
result = await researcher.auto_run(
task="Research AI trends and write a report",
max_steps=20,
verbosity=1
)

Key Features:

  • Topology is auto-generated from allowed_peers
  • No need to explicitly define nodes and edges
  • Great for simple workflows and testing
  • Supports user interaction when "User" is in allowed_peers

Comparison: Which Approach to Use?

MethodBest ForComplexityKey Features
allowed_peers + auto_runQuick prototyping, simple flowsSimplestAuto-topology, minimal setup, great for testing
String notationClear visual flows, medium complexityModerateEasy to read, supports bidirectional edges
Object-basedType-safe production systemsComplexFull control, metadata support, type checking
PatternConfigCommon team structuresModeratePre-tested patterns, quick setup for standard workflows

Decision Guide

  • Start with Peer-Based if you're prototyping or testing, building simple agent chains, or want minimal boilerplate
  • Use String Notation if you need clear, readable topology definitions or have moderate complexity (5-10 agents)
  • Choose Object-Based if you're building production systems, need type safety and validation, or want full control over metadata and edge properties
  • Select PatternConfig if you have a standard pattern (hub-spoke, pipeline, etc.) or want battle-tested configurations

Available Patterns

Hub-and-Spoke

Central coordinator with satellite agents. The hub communicates bidirectionally with each spoke.

Use Cases: Research teams, customer support, data aggregation

topology = PatternConfig.hub_and_spoke(
hub="LeadResearcher",
spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],
parallel_spokes=True
)

Pipeline

Sequential stages with optional parallelism within each stage. Data flows from Stage 1 through to the final stage.

Use Cases: Data processing, content creation, ETL workflows

topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["Scraper"]},
{"name": "transform", "agents": ["Parser", "Cleaner"]},
{"name": "load", "agents": ["Database"]}
],
parallel_within_stage=True
)

Mesh

Fully connected network where every agent can communicate with every other agent.

Use Cases: Collaborative problem solving, consensus building

topology = PatternConfig.mesh(
agents=["Expert1", "Expert2", "Expert3", "Moderator"],
fully_connected=True
)

Hierarchical

Tree-based delegation where a manager delegates to leads, who in turn delegate to workers.

Use Cases: Organization simulation, cascading tasks

topology = PatternConfig.hierarchical(
tree={
"Dispatcher": ["L1Support"],
"L1Support": ["L2Support"],
"L2Support": ["Engineering", "Management"]
}
)

Advanced Features

Dynamic Topology Mutation

Modify topologies at runtime:

# Start with basic topology
topology = Topology(
nodes=["Coordinator", "Worker1"],
edges=["Coordinator -> Worker1"]
)
# Add nodes dynamically
topology.add_node("Worker2")
topology.add_node(Node("Analyzer", node_type=NodeType.AGENT))
# Add edges
topology.add_edge("Coordinator", "Worker2")
topology.add_edge(Edge("Worker2", "Analyzer", bidirectional=True))
# Add rules
topology.add_rule(TimeoutRule(300))
topology.add_rule("max_steps(100)")
# Remove components
topology.remove_node("Worker1")
topology.remove_edge("Coordinator", "Worker1")

Convergence Points

Define where parallel branches merge:

# Manual convergence point
topology = Topology(
nodes=[
Node("Splitter", node_type=NodeType.AGENT),
Node("Worker1", node_type=NodeType.AGENT),
Node("Worker2", node_type=NodeType.AGENT),
Node("Worker3", node_type=NodeType.AGENT),
Node("Aggregator",
node_type=NodeType.AGENT,
is_convergence_point=True) # Convergence point
],
edges=[
"Splitter -> Worker1",
"Splitter -> Worker2",
"Splitter -> Worker3",
"Worker1 -> Aggregator",
"Worker2 -> Aggregator",
"Worker3 -> Aggregator"
]
)
# Automatic detection
config = ExecutionConfig(
auto_detect_convergence=True, # Auto-detect from topology
dynamic_convergence_enabled=True, # Runtime convergence
convergence_timeout=300.0 # Max wait time
)

Edge Patterns

Special edge behaviors:

from marsys.coordination.topology import EdgePattern
# Alternating conversation
Edge(
source="Negotiator1",
target="Negotiator2",
bidirectional=True,
pattern=EdgePattern.ALTERNATING, # Strict turn-taking
metadata={"max_turns": 5}
)
# Symmetric communication
Edge(
source="Peer1",
target="Peer2",
bidirectional=True,
pattern=EdgePattern.SYMMETRIC, # Equal communication rights
)
# Conditional edge
Edge(
source="Checker",
target="Escalator",
metadata={
"condition": "error_rate > 0.1", # Only if condition met
"priority": "high"
}
)

Rules System

Control execution behavior:

from marsys.coordination.rules import (
Rule, RuleType, RuleResult, RuleContext,
TimeoutRule, MaxAgentsRule, MaxStepsRule,
MemoryLimitRule, ConditionalRule
)
# Built-in rules
rules = [
TimeoutRule(max_duration_seconds=600),
MaxAgentsRule(max_agents=20),
MaxStepsRule(max_steps=100),
MemoryLimitRule(max_memory_mb=2048),
ConditionalRule(
condition=lambda ctx: ctx.error_count < 3,
action="continue"
)
]
# Custom rule
class BusinessHoursRule(Rule):
def __init__(self):
super().__init__(
name="business_hours",
rule_type=RuleType.PRE_EXECUTION,
priority=10
)
async def check(self, context: RuleContext) -> RuleResult:
from datetime import datetime
hour = datetime.now().hour
if 9 <= hour < 17: # Business hours
return RuleResult(
rule_name=self.name,
passed=True,
action="allow"
)
else:
return RuleResult(
rule_name=self.name,
passed=False,
action="defer",
reason="Outside business hours",
metadata={"retry_at": "09:00"}
)
# Use custom rule
topology.add_rule(BusinessHoursRule())

Topology Analysis

The framework provides powerful analysis tools:

from marsys.coordination.topology import TopologyAnalyzer
analyzer = TopologyAnalyzer(topology)
# Find entry points (nodes with no incoming edges)
entry_points = analyzer.get_entry_points()
print(f"Entry points: {entry_points}") # e.g., ["User", "Scheduler"]
# Find convergence points
convergence_points = analyzer.get_convergence_points()
print(f"Convergence: {convergence_points}") # e.g., ["Aggregator"]
# Check if conversation pattern exists
has_conversation = analyzer.has_conversation_pattern()
print(f"Has conversation: {has_conversation}")
# Get agent permissions
permissions = analyzer.get_agent_permissions("Coordinator")
print(f"Coordinator can invoke: {permissions}") # e.g., ["Worker1", "Worker2"]
# Validate topology
is_valid, errors = analyzer.validate()
if not is_valid:
print(f"Topology errors: {errors}")
# Get execution order (topological sort)
order = analyzer.get_execution_order()
print(f"Execution order: {order}")
# Detect cycles
has_cycles = analyzer.has_cycles()
print(f"Has cycles: {has_cycles}")
# Get shortest path
path = analyzer.get_shortest_path("User", "Reporter")
print(f"Shortest path: {path}")

Best Practices

1. Start Simple

Begin with basic patterns and add complexity as needed:

# Start with this
topology = PatternConfig.hub_and_spoke("Coordinator", ["Worker1", "Worker2"])
# Evolve to this
topology.add_node("Analyzer")
topology.add_edge("Worker1", "Analyzer")

2. Use Convergence Points

Always define clear convergence for parallel work:

Node("Aggregator", is_convergence_point=True)

3. Set Appropriate Timeouts

Different timeouts for different scenarios:

rules = [
TimeoutRule(60), # Quick task
TimeoutRule(3600), # Long research
]

4. Validate Before Execution

Always validate topology before running:

analyzer = TopologyAnalyzer(topology)
is_valid, errors = analyzer.validate()
assert is_valid, f"Invalid topology: {errors}"

5. Document Intent

Use metadata to document topology purpose:

topology = Topology(
nodes=[...],
edges=[...],
metadata={
"purpose": "Customer support escalation",
"version": "2.0",
"author": "Team Lead"
}
)

Avoid Cycles Without Exit

Ensure your topology has clear termination conditions. Cycles are allowed but should have exit conditions to prevent infinite loops.

Common Patterns

Research Team

topology = PatternConfig.hub_and_spoke(
hub="LeadResearcher",
spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],
parallel_spokes=True
)

Customer Support

topology = PatternConfig.hierarchical(
tree={
"Dispatcher": ["L1Support"],
"L1Support": ["L2Support"],
"L2Support": ["Engineering", "Management"]
}
)

Data Pipeline

topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["Scraper"]},
{"name": "transform", "agents": ["Parser", "Cleaner"]},
{"name": "load", "agents": ["Database"]}
],
parallel_within_stage=True
)

Consensus Building

topology = PatternConfig.mesh(
agents=["Expert1", "Expert2", "Expert3", "Moderator"],
fully_connected=True
)

Dynamic Behavior

Runtime Parallel Invocation

Agents can spawn parallel branches dynamically:

# In agent response
{
"next_action": "parallel_invoke",
"agents": ["Analyst1", "Analyst2", "Analyst3"],
"agent_requests": {
"Analyst1": "Analyze financial data",
"Analyst2": "Analyze market trends",
"Analyst3": "Analyze competition"
}
}

Conditional Routing

Route based on conditions:

# In agent response
{
"next_action": "invoke_agent",
"action_input": "ErrorHandler" if error else "NextStep"
}

Dynamic Agent Discovery

Agents can discover peers at runtime:

# Through context
available_agents = context.get("available_agents", [])
specialist = next((a for a in available_agents if "expert" in a.lower()), None)

Next Steps

Topology Mastered!

You now understand the topology system! Use it to build complex multi-agent workflows with confidence.