Topology System

Master the powerful topology system that defines how agents interact, communicate, and collaborate in complex workflows.

Overview

The topology system is the heart of MARSYS's orchestration capabilities. It defines:

  • Agent Relationships: Who can communicate with whom
  • Execution Flow: Sequential, parallel, or mixed patterns
  • Permission Management: Control agent invocation rights
  • Convergence Points: Where parallel branches merge
  • Rules and Constraints: Execution policies and limits

Architecture

The topology system is composed of several interconnected components:

  • Topology: The top-level container holding nodes, edges, and rules
  • Nodes: Agents and system components participating in the workflow
  • Edges: Communication paths between nodes
  • Rules: Execution constraints (timeouts, step limits, etc.)

The TopologyAnalyzer provides graph analysis and validation via TopologyGraph and TopologyValidator. Pre-built patterns (Hub-Spoke, Pipeline, Mesh, Hierarchical) feed into the topology for quick setup of common structures.

Four Ways to Define Multi-Agent Systems

MARSYS provides four different approaches to define how agents interact, from simple to sophisticated. We'll demonstrate each approach using the same example: a Researcher agent that gathers information and passes it to a Writer agent.

Perfect for quick prototyping and simple agent interactions. The topology is automatically created from the allowed_peers configuration:

from marsys.agents import Agent
from marsys.models import ModelConfig
# Configure the model
model_config = ModelConfig(
type="api",
name="anthropic/claude-opus-4.6",
provider="openrouter"
)
# Create agents with allowed_peers
researcher = Agent(
model_config=model_config,
name="Researcher",
goal="Expert at finding and analyzing information",
instruction="You are a research specialist. Find and analyze information thoroughly.",
allowed_peers=["Writer"] # Can invoke Writer
)
writer = Agent(
model_config=model_config,
name="Writer",
goal="Skilled at creating clear, engaging content",
instruction="You are a skilled writer. Create clear, engaging content based on research.",
allowed_peers=[] # Cannot invoke other agents
)
# Run with automatic topology creation
result = await researcher.auto_run(
task="Research AI trends and write a report",
max_steps=20,
verbosity=1
)

Key Features:

  • Topology is auto-generated from allowed_peers
  • No need to explicitly define nodes and edges
  • Great for simple workflows and testing
  • Supports user interaction when "User" is in allowed_peers

Comparison: Which Approach to Use?

MethodBest ForComplexityKey Features
Way 1: allowed_peers + auto_runQuick prototyping, simple flowsSimplestAuto-topology, minimal setup, great for testing
Way 2: String notationClear visual flows, medium complexityModerateEasy to read, supports bidirectional edges
Way 3: Object-basedType-safe production systemsComplexFull control, metadata support, type checking
Way 4: PatternConfigCommon team structuresModeratePre-tested patterns, quick setup for standard workflows

Decision Guide

Start with Way 1 if you're:

  • Prototyping or testing
  • Building simple agent chains
  • Want minimal boilerplate

Use Way 2 if you:

  • Need clear, readable topology definitions
  • Have moderate complexity (5-10 agents)
  • Want to visualize agent relationships easily

Choose Way 3 if you:

  • Building production systems
  • Need type safety and validation
  • Want full control over metadata and edge properties

Select Way 4 if you:

  • Have a standard pattern (hub-spoke, pipeline, etc.)
  • Want battle-tested configurations
  • Need to implement common team structures quickly

Available Patterns

Hub-and-Spoke

Central coordinator with satellite agents. The hub communicates bidirectionally with each spoke agent (Hub <--> Spoke1, Hub <--> Spoke2, Hub <--> Spoke3).

Use Cases: Research teams, customer support, data aggregation

Pipeline

Sequential stages with optional parallelism. Data flows linearly: Stage 1 --> Stage 2 --> Stage 3 --> Stage 4.

Use Cases: Data processing, content creation, ETL workflows

Mesh

Fully connected network where every agent can communicate with every other agent. All agents have bidirectional edges to all other agents.

Use Cases: Collaborative problem solving, consensus building

Hierarchical

Tree-based delegation structure. A Manager delegates to Leads (Lead1, Lead2), who in turn delegate to Workers (Worker1, Worker2 under Lead1; Worker3, Worker4 under Lead2).

Use Cases: Organization simulation, cascading tasks

Advanced Features

Dynamic Topology Mutation

Modify topologies at runtime:

# Start with basic topology
topology = Topology(
nodes=["Coordinator", "Worker1"],
edges=["Coordinator -> Worker1"]
)
# Add nodes dynamically
topology.add_node("Worker2")
topology.add_node(Node("Analyzer", node_type=NodeType.AGENT))
# Add edges
topology.add_edge("Coordinator", "Worker2")
topology.add_edge(Edge("Worker2", "Analyzer", bidirectional=True))
# Add rules
topology.add_rule(TimeoutRule(300))
topology.add_rule("max_steps(100)")
# Remove components
topology.remove_node("Worker1")
topology.remove_edge("Coordinator", "Worker1")

Convergence Points

Define where parallel branches merge:

# Manual convergence point
topology = Topology(
nodes=[
Node("Splitter", node_type=NodeType.AGENT),
Node("Worker1", node_type=NodeType.AGENT),
Node("Worker2", node_type=NodeType.AGENT),
Node("Worker3", node_type=NodeType.AGENT),
Node("Aggregator",
node_type=NodeType.AGENT,
is_convergence_point=True) # Convergence point
],
edges=[
"Splitter -> Worker1",
"Splitter -> Worker2",
"Splitter -> Worker3",
"Worker1 -> Aggregator",
"Worker2 -> Aggregator",
"Worker3 -> Aggregator"
]
)
# Automatic detection
config = ExecutionConfig(
auto_detect_convergence=True, # Auto-detect from topology
dynamic_convergence_enabled=True, # Runtime convergence
convergence_timeout=300.0 # Max wait time
)

Edge Patterns

Special edge behaviors:

from marsys.coordination.topology import EdgePattern
# Alternating conversation
Edge(
source="Negotiator1",
target="Negotiator2",
bidirectional=True,
pattern=EdgePattern.ALTERNATING, # Strict turn-taking
metadata={"max_turns": 5}
)
# Symmetric communication
Edge(
source="Peer1",
target="Peer2",
bidirectional=True,
pattern=EdgePattern.SYMMETRIC, # Equal communication rights
)
# Conditional edge
Edge(
source="Checker",
target="Escalator",
metadata={
"condition": "error_rate > 0.1", # Only if condition met
"priority": "high"
}
)

Rules System

Control execution behavior:

from marsys.coordination.rules import (
Rule, RuleType, RuleResult, RuleContext,
TimeoutRule, MaxAgentsRule, MaxStepsRule,
MemoryLimitRule, ConditionalRule
)
# Built-in rules
rules = [
TimeoutRule(max_duration_seconds=600),
MaxAgentsRule(max_agents=20),
MaxStepsRule(max_steps=100),
MemoryLimitRule(max_memory_mb=2048),
ConditionalRule(
condition=lambda ctx: ctx.error_count < 3,
action="continue"
)
]
# Custom rule
class BusinessHoursRule(Rule):
def __init__(self):
super().__init__(
name="business_hours",
rule_type=RuleType.PRE_EXECUTION,
priority=10
)
async def check(self, context: RuleContext) -> RuleResult:
from datetime import datetime
hour = datetime.now().hour
if 9 <= hour < 17: # Business hours
return RuleResult(
rule_name=self.name,
passed=True,
action="allow"
)
else:
return RuleResult(
rule_name=self.name,
passed=False,
action="defer",
reason="Outside business hours",
metadata={"retry_at": "09:00"}
)
# Use custom rule
topology.add_rule(BusinessHoursRule())

Topology Analysis

The framework provides powerful analysis tools:

from marsys.coordination.topology import TopologyAnalyzer
analyzer = TopologyAnalyzer(topology)
# Find entry points (nodes with no incoming edges)
entry_points = analyzer.get_entry_points()
print(f"Entry points: {entry_points}") # e.g., ["User", "Scheduler"]
# Find convergence points
convergence_points = analyzer.get_convergence_points()
print(f"Convergence: {convergence_points}") # e.g., ["Aggregator"]
# Check if conversation pattern exists
has_conversation = analyzer.has_conversation_pattern()
print(f"Has conversation: {has_conversation}")
# Get agent permissions
permissions = analyzer.get_agent_permissions("Coordinator")
print(f"Coordinator can invoke: {permissions}") # e.g., ["Worker1", "Worker2"]
# Validate topology
is_valid, errors = analyzer.validate()
if not is_valid:
print(f"Topology errors: {errors}")
# Get execution order (topological sort)
order = analyzer.get_execution_order()
print(f"Execution order: {order}")
# Detect cycles
has_cycles = analyzer.has_cycles()
print(f"Has cycles: {has_cycles}")
# Get shortest path
path = analyzer.get_shortest_path("User", "Reporter")
print(f"Shortest path: {path}")

Best Practices

1. Start Simple

Begin with basic patterns and add complexity as needed:

# Start with this
topology = PatternConfig.hub_and_spoke("Coordinator", ["Worker1", "Worker2"])
# Evolve to this
topology.add_node("Analyzer")
topology.add_edge("Worker1", "Analyzer")

2. Use Convergence Points

Always define clear convergence for parallel work:

Node("Aggregator", is_convergence_point=True)

3. Set Appropriate Timeouts

Different timeouts for different scenarios:

rules = [
TimeoutRule(60), # Quick task
TimeoutRule(3600), # Long research
]

4. Validate Before Execution

Always validate topology before running:

analyzer = TopologyAnalyzer(topology)
is_valid, errors = analyzer.validate()
assert is_valid, f"Invalid topology: {errors}"

5. Document Intent

Use metadata to document topology purpose:

topology = Topology(
nodes=[...],
edges=[...],
metadata={
"purpose": "Customer support escalation",
"version": "2.0",
"author": "Team Lead"
}
)

Common Patterns

Research Team

topology = PatternConfig.hub_and_spoke(
hub="LeadResearcher",
spokes=["DataCollector", "FactChecker", "Analyst", "Writer"],
parallel_spokes=True
)

Customer Support

topology = PatternConfig.hierarchical(
tree={
"Dispatcher": ["L1Support"],
"L1Support": ["L2Support"],
"L2Support": ["Engineering", "Management"]
}
)

Data Pipeline

topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["Scraper"]},
{"name": "transform", "agents": ["Parser", "Cleaner"]},
{"name": "load", "agents": ["Database"]}
],
parallel_within_stage=True
)

Consensus Building

topology = PatternConfig.mesh(
agents=["Expert1", "Expert2", "Expert3", "Moderator"],
fully_connected=True
)

Dynamic Behavior

Runtime Parallel Invocation

Agents can spawn parallel branches dynamically:

# In agent response
{
"next_action": "parallel_invoke",
"agents": ["Analyst1", "Analyst2", "Analyst3"],
"agent_requests": {
"Analyst1": "Analyze financial data",
"Analyst2": "Analyze market trends",
"Analyst3": "Analyze competition"
}
}

Conditional Routing

Route based on conditions:

# In agent response
{
"next_action": "invoke_agent",
"action_input": "ErrorHandler" if error else "NextStep"
}

Dynamic Agent Discovery

Agents can discover peers at runtime:

# Through context
available_agents = context.get("available_agents", [])
specialist = next((a for a in available_agents if "expert" in a.lower()), None)

Next Steps

Master topology patterns:

Topology Mastered!

You now understand the topology system! Use it to build complex multi-agent workflows with confidence.