Topology API
Complete API reference for the topology system that defines agent communication patterns and workflow structures.
See Also
For conceptual understanding of topologies, see the Topology Concepts Guide.
Topology
Main topology container that holds nodes, edges, and rules.
Constructor
from marsys.coordination.topology import Topologytopology = Topology(nodes: List[Union[Node, str]] = None,edges: List[Union[Edge, str]] = None,rules: List[Union[Rule, str]] = None,metadata: Dict[str, Any] = None)
Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
nodes | List[Union[Node, str]] | Graph nodes (agents) | [] |
edges | List[Union[Edge, str]] | Connections between nodes | [] |
rules | List[Union[Rule, str]] | Execution rules | [] |
metadata | Dict[str, Any] | Additional metadata | |
Methods
| Method | Returns | Description |
|---|---|---|
add_node(node) | Node | Add a node to topology |
add_edge(edge) | Edge | Add an edge to topology |
add_rule(rule) | Rule | Add a rule to topology |
remove_node(name) | bool | Remove node by name |
remove_edge(source, target) | bool | Remove specific edge |
get_node(name) | Optional[Node] | Get node by name |
validate() | bool | Validate topology consistency |
Usage Example
from marsys.coordination.topology import Topology, Node, Edgetopology = Topology(nodes=[Node("Coordinator", node_type=NodeType.AGENT),Node("Worker1", node_type=NodeType.AGENT),Node("Worker2", node_type=NodeType.AGENT)],edges=[Edge("Coordinator", "Worker1"),Edge("Coordinator", "Worker2"),Edge("Worker1", "Coordinator"),Edge("Worker2", "Coordinator")])
Node
Represents an agent or system component in the topology.
Constructor
from marsys.coordination.topology import Node, NodeTypenode = Node(name: str,node_type: NodeType = NodeType.AGENT,agent_ref: Optional[Any] = None,is_convergence_point: bool = False,metadata: Dict[str, Any] = None)
Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
name | str | Unique node identifier | Required |
node_type | NodeType | Type of node | AGENT |
agent_ref | Optional[Any] | Reference to agent instance | None |
is_convergence_point | bool | Marks convergence point | False |
metadata | Dict[str, Any] | Additional node data | |
NodeType Enum
class NodeType(Enum):USER = "user" # User interaction nodeAGENT = "agent" # AI agent nodeSYSTEM = "system" # System componentTOOL = "tool" # Tool node
Usage Example
from marsys.coordination.topology import Node, NodeType# Create different node typesuser_node = Node("User", node_type=NodeType.USER)agent_node = Node("Assistant", node_type=NodeType.AGENT)convergence = Node("Aggregator",node_type=NodeType.AGENT,is_convergence_point=True)
Edge
Defines connections and communication paths between nodes.
Constructor
from marsys.coordination.topology import Edge, EdgeType, EdgePatternedge = Edge(source: str,target: str,edge_type: EdgeType = EdgeType.INVOKE,bidirectional: bool = False,pattern: Optional[EdgePattern] = None,metadata: Dict[str, Any] = None)
Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
source | str | Source node name | Required |
target | str | Target node name | Required |
edge_type | EdgeType | Type of connection | INVOKE |
bidirectional | bool | Two-way communication | False |
pattern | Optional[EdgePattern] | Communication pattern | None |
metadata | Dict[str, Any] | Additional edge data | |
EdgeType Enum
class EdgeType(Enum):INVOKE = "invoke" # Agent invocationNOTIFY = "notify" # Notification onlyQUERY = "query" # Query/responseSTREAM = "stream" # Streaming data
EdgePattern Enum
class EdgePattern(Enum):ALTERNATING = "alternating" # Take turnsSYMMETRIC = "symmetric" # Equal access
Usage Example
# One-way edgeedge1 = Edge("Manager", "Worker")# Bidirectional conversationedge2 = Edge("Agent1","Agent2",bidirectional=True,pattern=EdgePattern.ALTERNATING)# Notification edgeedge3 = Edge("Monitor","Logger",edge_type=EdgeType.NOTIFY)
PatternConfig
Pre-defined topology patterns for common use cases.
hub_and_spoke()
from marsys.coordination.topology.patterns import PatternConfigtopology = PatternConfig.hub_and_spoke(hub: str,spokes: List[str],parallel_spokes: bool = False,bidirectional: bool = True) -> Topology
| Parameter | Type | Description | Default |
|---|---|---|---|
hub | str | Central coordinator name | Required |
spokes | List[str] | Spoke agent names | Required |
parallel_spokes | bool | Execute spokes in parallel | False |
bidirectional | bool | Two-way communication | True |
pipeline()
topology = PatternConfig.pipeline(stages: List[Dict[str, Any]],parallel_within_stage: bool = False) -> Topology
mesh()
topology = PatternConfig.mesh(agents: List[str],fully_connected: bool = True) -> Topology
hierarchical()
topology = PatternConfig.hierarchical(tree: Dict[str, List[str]]) -> Topology
Pattern Examples
from marsys.coordination.topology.patterns import PatternConfig# Hub and spoke patterntopology = PatternConfig.hub_and_spoke(hub="Coordinator",spokes=["Worker1", "Worker2", "Worker3"],parallel_spokes=True)# Pipeline patterntopology = PatternConfig.pipeline(stages=[{"name": "extract", "agents": ["Extractor"]},{"name": "transform", "agents": ["Transformer1", "Transformer2"]},{"name": "load", "agents": ["Loader"]}],parallel_within_stage=True)# Hierarchical patterntopology = PatternConfig.hierarchical(tree={"CEO": ["VP1", "VP2"],"VP1": ["Manager1", "Manager2"],"VP2": ["Manager3"],"Manager1": ["Worker1", "Worker2"]})
TopologyAnalyzer
Analyzes and validates topology structures.
Constructor & Methods
from marsys.coordination.topology.analyzer import TopologyAnalyzeranalyzer = TopologyAnalyzer(topology: Topology)
| Method | Returns | Description |
|---|---|---|
create_graph() | TopologyGraph | Create graph representation |
validate() | Tuple[bool, List[str]] | Validate topology |
find_cycles() | List[List[str]] | Detect cycles |
get_execution_order() | List[str] | Topological sort |
Usage Example
analyzer = TopologyAnalyzer(topology)graph = analyzer.create_graph()# Validate topologyis_valid, errors = analyzer.validate()if not is_valid:print(f"Topology errors: {errors}")# Get execution orderorder = analyzer.get_execution_order()
String Notation
Convert string-based topology definitions for quick prototyping.
topology = {"agents": ["Agent1", "Agent2", "Agent3"],"flows": ["Agent1 -> Agent2", # One-way"Agent2 <-> Agent3", # Bidirectional"Agent1 => Agent3" # Strong connection],"rules": ["timeout(300)","max_agents(10)"]}
Common Patterns
Sequential Chain
topology = {"agents": ["A", "B", "C"],"flows": ["A -> B", "B -> C"]}
Parallel Execution
topology = PatternConfig.hub_and_spoke(hub="Coordinator",spokes=["Worker1", "Worker2"],parallel_spokes=True)
Conversation Loop
topology = {"agents": ["Agent1", "Agent2"],"flows": ["Agent1 <-> Agent2"],"rules": ["max_turns(5)"]}
Pro Tip
Start with pre-defined patterns from PatternConfig and customize as needed. This ensures proper structure and reduces errors.