Topology API

Complete API reference for the topology system that defines agent communication patterns and workflow structures.

See Also

For conceptual understanding of topologies, see the Topology Concepts Guide.

Topology

Main topology container that holds nodes, edges, and rules.

Constructor

from marsys.coordination.topology import Topology
topology = Topology(
nodes: List[Union[Node, str]] = None,
edges: List[Union[Edge, str]] = None,
rules: List[Union[Rule, str]] = None,
metadata: Dict[str, Any] = None
)

Parameters

ParameterTypeDescriptionDefault
nodesList[Union[Node, str]]Graph nodes (agents)[]
edgesList[Union[Edge, str]]Connections between nodes[]
rulesList[Union[Rule, str]]Execution rules[]
metadataDict[str, Any]Additional metadata

Methods

MethodReturnsDescription
add_node(node)NodeAdd a node to topology
add_edge(edge)EdgeAdd an edge to topology
add_rule(rule)RuleAdd a rule to topology
remove_node(name)boolRemove node by name
remove_edge(source, target)boolRemove specific edge
get_node(name)Optional[Node]Get node by name
validate()boolValidate topology consistency

Usage Example

from marsys.coordination.topology import Topology, Node, Edge
topology = Topology(
nodes=[
Node("Coordinator", node_type=NodeType.AGENT),
Node("Worker1", node_type=NodeType.AGENT),
Node("Worker2", node_type=NodeType.AGENT)
],
edges=[
Edge("Coordinator", "Worker1"),
Edge("Coordinator", "Worker2"),
Edge("Worker1", "Coordinator"),
Edge("Worker2", "Coordinator")
]
)

Node

Represents an agent or system component in the topology.

Constructor

from marsys.coordination.topology import Node, NodeType
node = Node(
name: str,
node_type: NodeType = NodeType.AGENT,
agent_ref: Optional[Any] = None,
is_convergence_point: bool = False,
metadata: Dict[str, Any] = None
)

Parameters

ParameterTypeDescriptionDefault
namestrUnique node identifierRequired
node_typeNodeTypeType of nodeAGENT
agent_refOptional[Any]Reference to agent instanceNone
is_convergence_pointboolMarks convergence pointFalse
metadataDict[str, Any]Additional node data

NodeType Enum

class NodeType(Enum):
USER = "user" # User interaction node
AGENT = "agent" # AI agent node
SYSTEM = "system" # System component
TOOL = "tool" # Tool node

Usage Example

from marsys.coordination.topology import Node, NodeType
# Create different node types
user_node = Node("User", node_type=NodeType.USER)
agent_node = Node("Assistant", node_type=NodeType.AGENT)
convergence = Node(
"Aggregator",
node_type=NodeType.AGENT,
is_convergence_point=True
)

Edge

Defines connections and communication paths between nodes.

Constructor

from marsys.coordination.topology import Edge, EdgeType, EdgePattern
edge = Edge(
source: str,
target: str,
edge_type: EdgeType = EdgeType.INVOKE,
bidirectional: bool = False,
pattern: Optional[EdgePattern] = None,
metadata: Dict[str, Any] = None
)

Parameters

ParameterTypeDescriptionDefault
sourcestrSource node nameRequired
targetstrTarget node nameRequired
edge_typeEdgeTypeType of connectionINVOKE
bidirectionalboolTwo-way communicationFalse
patternOptional[EdgePattern]Communication patternNone
metadataDict[str, Any]Additional edge data

EdgeType Enum

class EdgeType(Enum):
INVOKE = "invoke" # Agent invocation
NOTIFY = "notify" # Notification only
QUERY = "query" # Query/response
STREAM = "stream" # Streaming data

EdgePattern Enum

class EdgePattern(Enum):
ALTERNATING = "alternating" # Take turns
SYMMETRIC = "symmetric" # Equal access

Usage Example

# One-way edge
edge1 = Edge("Manager", "Worker")
# Bidirectional conversation
edge2 = Edge(
"Agent1",
"Agent2",
bidirectional=True,
pattern=EdgePattern.ALTERNATING
)
# Notification edge
edge3 = Edge(
"Monitor",
"Logger",
edge_type=EdgeType.NOTIFY
)

PatternConfig

Pre-defined topology patterns for common use cases.

hub_and_spoke()

from marsys.coordination.topology.patterns import PatternConfig
topology = PatternConfig.hub_and_spoke(
hub: str,
spokes: List[str],
parallel_spokes: bool = False,
bidirectional: bool = True
) -> Topology
ParameterTypeDescriptionDefault
hubstrCentral coordinator nameRequired
spokesList[str]Spoke agent namesRequired
parallel_spokesboolExecute spokes in parallelFalse
bidirectionalboolTwo-way communicationTrue

pipeline()

topology = PatternConfig.pipeline(
stages: List[Dict[str, Any]],
parallel_within_stage: bool = False
) -> Topology

mesh()

topology = PatternConfig.mesh(
agents: List[str],
fully_connected: bool = True
) -> Topology

hierarchical()

topology = PatternConfig.hierarchical(
tree: Dict[str, List[str]]
) -> Topology

Pattern Examples

from marsys.coordination.topology.patterns import PatternConfig
# Hub and spoke pattern
topology = PatternConfig.hub_and_spoke(
hub="Coordinator",
spokes=["Worker1", "Worker2", "Worker3"],
parallel_spokes=True
)
# Pipeline pattern
topology = PatternConfig.pipeline(
stages=[
{"name": "extract", "agents": ["Extractor"]},
{"name": "transform", "agents": ["Transformer1", "Transformer2"]},
{"name": "load", "agents": ["Loader"]}
],
parallel_within_stage=True
)
# Hierarchical pattern
topology = PatternConfig.hierarchical(
tree={
"CEO": ["VP1", "VP2"],
"VP1": ["Manager1", "Manager2"],
"VP2": ["Manager3"],
"Manager1": ["Worker1", "Worker2"]
}
)

TopologyAnalyzer

Analyzes and validates topology structures.

Constructor & Methods

from marsys.coordination.topology.analyzer import TopologyAnalyzer
analyzer = TopologyAnalyzer(topology: Topology)
MethodReturnsDescription
create_graph()TopologyGraphCreate graph representation
validate()Tuple[bool, List[str]]Validate topology
find_cycles()List[List[str]]Detect cycles
get_execution_order()List[str]Topological sort

Usage Example

analyzer = TopologyAnalyzer(topology)
graph = analyzer.create_graph()
# Validate topology
is_valid, errors = analyzer.validate()
if not is_valid:
print(f"Topology errors: {errors}")
# Get execution order
order = analyzer.get_execution_order()

String Notation

Convert string-based topology definitions for quick prototyping.

topology = {
"agents": ["Agent1", "Agent2", "Agent3"],
"flows": [
"Agent1 -> Agent2", # One-way
"Agent2 <-> Agent3", # Bidirectional
"Agent1 => Agent3" # Strong connection
],
"rules": [
"timeout(300)",
"max_agents(10)"
]
}

Common Patterns

Sequential Chain

topology = {
"agents": ["A", "B", "C"],
"flows": ["A -> B", "B -> C"]
}

Parallel Execution

topology = PatternConfig.hub_and_spoke(
hub="Coordinator",
spokes=["Worker1", "Worker2"],
parallel_spokes=True
)

Conversation Loop

topology = {
"agents": ["Agent1", "Agent2"],
"flows": ["Agent1 <-> Agent2"],
"rules": ["max_turns(5)"]
}

Pro Tip

Start with pre-defined patterns from PatternConfig and customize as needed. This ensures proper structure and reduces errors.