Topology

Topologies define how agents communicate and coordinate in MARSYS - they are graph structures that specify workflow patterns and execution flow.

See Also

For detailed class signatures and method parameters, see the Topology API Reference.

Overview

A topology in MARSYS is a directed graph that defines:

  • Nodes: Agents or system components that participate in the workflow
  • Edges: Communication paths and relationships between nodes
  • Rules: Execution constraints like timeouts and convergence conditions

Defining Topologies

MARSYS offers three ways to define topologies, from simple to complex:

1. Dictionary Format (Simplest)

The quickest way to define a topology using Python dictionaries:

# Simple dictionary topology
topology = {
"agents": ["Coordinator", "Worker1", "Worker2"],
"flows": [
"Coordinator -> Worker1",
"Coordinator -> Worker2",
"Worker1 -> Coordinator",
"Worker2 -> Coordinator"
]
}
# Use with Orchestra
result = await Orchestra.run(
task="Analyze this data",
topology=topology
)

2. Pattern-Based (Recommended)

Use pre-defined patterns for common workflow structures:

from marsys.coordination.topology.patterns import PatternConfig
# Hub and spoke pattern - coordinator delegates to workers
topology = PatternConfig.hub_and_spoke(
hub="Coordinator",
spokes=["Researcher", "Analyst", "Writer"],
parallel_spokes=True, # Execute workers in parallel
bidirectional=True # Workers can respond back
)
# Pipeline pattern - sequential processing stages
topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["DataExtractor"]},
{"name": "transform", "agents": ["Transformer1", "Transformer2"]},
{"name": "load", "agents": ["DataLoader"]}
],
parallel_within_stage=True
)
# Hierarchical pattern - tree structure
topology = PatternConfig.hierarchical(
tree={
"Director": ["Manager1", "Manager2"],
"Manager1": ["Worker1", "Worker2"],
"Manager2": ["Worker3"]
}
)

3. Object-Based (Most Control)

Full control with typed objects for complex scenarios:

from marsys.coordination.topology import Topology, Node, Edge, NodeType, EdgeType
topology = Topology(
nodes=[
Node("User", node_type=NodeType.USER),
Node("Coordinator", node_type=NodeType.AGENT),
Node("Worker1", node_type=NodeType.AGENT),
Node("Worker2", node_type=NodeType.AGENT),
Node("Aggregator",
node_type=NodeType.AGENT,
is_convergence_point=True)
],
edges=[
Edge("User", "Coordinator"),
Edge("Coordinator", "Worker1"),
Edge("Coordinator", "Worker2"),
Edge("Worker1", "Aggregator"),
Edge("Worker2", "Aggregator"),
Edge("Aggregator", "User")
]
)

Node Types

Nodes represent participants in the workflow:

from marsys.coordination.topology import Node, NodeType
# User node - for human-in-the-loop interactions
user = Node("User", node_type=NodeType.USER)
# Agent node - AI agent that processes tasks
agent = Node("Assistant", node_type=NodeType.AGENT)
# System node - internal system component
system = Node("Logger", node_type=NodeType.SYSTEM)
# Tool node - represents a tool/function
tool = Node("SearchTool", node_type=NodeType.TOOL)
# Convergence point - where parallel branches merge
aggregator = Node(
"Aggregator",
node_type=NodeType.AGENT,
is_convergence_point=True
)

Edge Types

Edges define how nodes communicate:

from marsys.coordination.topology import Edge, EdgeType, EdgePattern
# Standard invocation - one agent calls another
invoke = Edge("Manager", "Worker", edge_type=EdgeType.INVOKE)
# Notification - fire and forget
notify = Edge("Worker", "Logger", edge_type=EdgeType.NOTIFY)
# Query/Response - request and wait for response
query = Edge("Agent", "Database", edge_type=EdgeType.QUERY)
# Streaming - continuous data flow
stream = Edge("Producer", "Consumer", edge_type=EdgeType.STREAM)
# Bidirectional conversation
conversation = Edge(
"Agent1", "Agent2",
bidirectional=True,
pattern=EdgePattern.ALTERNATING # Take turns
)

Common Patterns

Hub and Spoke

A central coordinator delegates tasks to specialized workers:

# Best for: Task delegation, parallel processing
topology = PatternConfig.hub_and_spoke(
hub="Coordinator",
spokes=["Researcher", "Analyst", "Writer"],
parallel_spokes=True
)
# Flow:
# 1. Coordinator receives task
# 2. Delegates to Researcher, Analyst, Writer in parallel
# 3. Workers return results to Coordinator
# 4. Coordinator aggregates and produces final output

Pipeline

Sequential processing through multiple stages:

# Best for: ETL workflows, sequential transformations
topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["Extractor"]},
{"name": "transform", "agents": ["Cleaner", "Enricher"]},
{"name": "analyze", "agents": ["Analyzer"]},
{"name": "report", "agents": ["Reporter"]}
],
parallel_within_stage=True
)
# Flow:
# 1. Extractor pulls data
# 2. Cleaner and Enricher process in parallel
# 3. Analyzer receives transformed data
# 4. Reporter generates final output

Mesh

Fully connected network where any agent can communicate with any other:

# Best for: Collaborative problem-solving, debate scenarios
topology = PatternConfig.mesh(
agents=["Expert1", "Expert2", "Expert3"],
fully_connected=True
)
# All agents can communicate with each other

Choosing the Right Pattern

Hub and Spoke: When you need central coordination and parallel workers.
Pipeline: When tasks must flow through sequential stages.
Mesh: When agents need to collaborate freely.
Hierarchical: When you have organizational/reporting structures.

User Interaction

Include human-in-the-loop by adding User nodes:

# Topology with user interaction
topology = {
"agents": ["User", "Assistant", "Reviewer"],
"flows": [
"User -> Assistant", # User provides task
"Assistant -> Reviewer", # Assistant produces draft
"Reviewer -> User" # User reviews result
]
}
# Execute with user interaction enabled
result = await Orchestra.run(
task="Help me write a report",
topology=topology,
execution_config=ExecutionConfig(
user_interaction="terminal", # Enable terminal input
user_first=True # Start with user input
)
)

Dynamic Topology Modification

Topologies can be modified programmatically:

from marsys.coordination.topology import Topology, Node, Edge
# Create base topology
topology = Topology()
# Add nodes dynamically
topology.add_node(Node("Coordinator"))
topology.add_node(Node("Worker1"))
# Add edges
topology.add_edge(Edge("Coordinator", "Worker1"))
# Add more workers based on workload
for i in range(2, 5):
worker = Node(f"Worker{i}")
topology.add_node(worker)
topology.add_edge(Edge("Coordinator", f"Worker{i}"))
topology.add_edge(Edge(f"Worker{i}", "Coordinator"))
# Validate before execution
if topology.validate():
result = await Orchestra.run(task, topology)

Best Practices

  • Start with patterns: Use PatternConfig for common cases before building custom topologies
  • Define convergence points: Mark where parallel branches should merge to avoid orphaned results
  • Use meaningful names: Node names should reflect their purpose (e.g., "Researcher" not "Agent1")
  • Validate before running: Call topology.validate() to catch configuration errors early
  • Consider error paths: Include edges for error handling and recovery flows

Avoid Cycles Without Exit

Ensure your topology has clear termination conditions. Cycles are allowed but should have exit conditions to prevent infinite loops.