Communication API
Complete API reference for the user interaction and communication system in multi-agent workflows.
See Also
For communication patterns, see the Communication Concepts guide.
CommunicationManager
Central manager for user communication across different channels.
Import
from marsys.coordination.communication import CommunicationManagerfrom marsys.coordination.config import CommunicationConfig
Constructor
CommunicationManager(config: Optional[CommunicationConfig] = None)
Key Methods
register_channel
def register_channel(channel: CommunicationChannel) -> None
Register a communication channel.
request_user_input
async def request_user_input(prompt: str,session_id: str,channel_id: Optional[str] = None,timeout: Optional[float] = None,metadata: Optional[Dict[str, Any]] = None) -> str
send_message
async def send_message(message: str,session_id: str,channel_id: Optional[str] = None,message_type: str = "info") -> None
Example
# Initialize managerconfig = CommunicationConfig(use_rich_formatting=True,theme_name="modern")manager = CommunicationManager(config)# Request user inputresponse = await manager.request_user_input(prompt="What would you like to analyze?",session_id="session_123")# Send messageawait manager.send_message(message="Analysis complete!",session_id="session_123",message_type="success")
UserNodeHandler
Handles execution when control reaches a User node in topology.
Import
from marsys.coordination.communication import UserNodeHandler
handle_user_node
async def handle_user_node(branch: ExecutionBranch,incoming_message: Any,context: Dict[str, Any]) -> StepResult
Example
handler = UserNodeHandler(communication_manager)# Handle user interactionresult = await handler.handle_user_node(branch=current_branch,incoming_message="Please approve the analysis results",context={"session_id": "session_123"})# Process resultif result.success:user_response = result.data["response"]next_agent = result.data.get("next_agent")
Communication Channels
TerminalChannel
Basic terminal I/O channel.
from marsys.coordination.communication.channels import TerminalChannelchannel = TerminalChannel()# Send messageawait channel.send("Enter your choice:")# Receive inputresponse = await channel.receive(timeout=30.0)
EnhancedTerminalChannel
Rich terminal with advanced formatting, colors, and progress indicators.
from marsys.coordination.communication.channels import EnhancedTerminalChannelchannel = EnhancedTerminalChannel(theme_name="modern",use_rich=True)# Send formatted messageawait channel.send({"content": "## Analysis Results","format": "markdown","style": "success"})
PrefixedCLIChannel
CLI channel with agent name prefixes.
from marsys.coordination.communication.channels import PrefixedCLIChannelchannel = PrefixedCLIChannel(prefix_width=25,show_timestamps=True)# Send with prefixawait channel.send(message="Processing data...",metadata={"agent_name": "DataProcessor"})# Output: [DataProcessor] Processing data...
CommunicationMode
Enumeration of communication patterns.
| Mode | Description | Use Case |
|---|---|---|
| SYNC | Synchronous blocking | Terminal input |
| ASYNC_PUBSUB | Event-driven async | Web interfaces |
| ASYNC_QUEUE | Queue-based async | Message systems |
User Interaction Patterns
Simple User Input
# In topologytopology = {"agents": ["Agent1", "User", "Agent2"],"flows": ["Agent1 -> User","User -> Agent2"]}# Agent1 response triggers user interactionresponse = {"next_action": "invoke_agent","action_input": "User","message": "Please review the results"}# User sees message and provides input# System routes response to Agent2
Error Recovery
# Agent triggers error recoveryresponse = {"next_action": "error_recovery","error_details": {"type": "api_quota_exceeded","message": "API quota exceeded"},"suggested_action": "retry_with_different_model"}# Routes to User node for intervention# User can:# - Retry with same settings# - Change model# - Skip operation# - Abort workflow
Approval Workflow
# Configure approval interactioninteraction = UserInteraction(id="approval_123",prompt="Approve deployment to production?",options=["approve", "reject", "review"],metadata={"changes": ["Update API", "Database migration"],"risk_level": "medium"})# Request approvalresponse = await manager.request_structured_input(interaction)if response == "approve":# Continue with deploymentpasselif response == "review":# Show detailed changespass
EventBus
Event bus for communication events.
Events
user.input.requested- Input requesteduser.input.received- Input receiveduser.message.sent- Message sentchannel.connected- Channel connectedchannel.disconnected- Channel disconnected
Example
from marsys.coordination.communication import EventBusbus = EventBus()# Subscribe to eventsdef on_input(data):print(f"User input: {data['response']}")bus.subscribe("user.input.received", on_input)# Publish eventawait bus.publish("user.input.received",{"response": "yes", "session_id": "123"})
Custom Channels
from marsys.coordination.communication import CommunicationChannelclass WebSocketChannel(CommunicationChannel):"""WebSocket-based communication channel."""def __init__(self, ws_url: str):super().__init__(channel_id="websocket")self.ws_url = ws_urlself.ws = Noneasync def connect(self):self.ws = await websocket.connect(self.ws_url)async def send(self, message: str, metadata: Dict = None):await self.ws.send(json.dumps({"message": message,"metadata": metadata}))async def receive(self, timeout: float = None):try:data = await asyncio.wait_for(self.ws.receive(),timeout=timeout)return json.loads(data)["response"]except asyncio.TimeoutError:return Nonedef is_available(self) -> bool:return self.ws and not self.ws.closedasync def close(self):if self.ws:await self.ws.close()
Best Practices
Do
- Set appropriate timeouts for user input
- Provide clear prompts and options
- Handle timeout gracefully
- Store interaction history
Don't
- Block indefinitely on user input
- Mix communication channels in same session
- Ignore channel availability
- Send sensitive data unencrypted
Pro Tip
Use EnhancedTerminalChannel for better user experience with colors, formatting, and progress indicators. It automatically falls back to basic terminal if Rich is unavailable.
Timeout Handling
Always set reasonable timeouts for user input to prevent indefinite blocking. Consider offering a retry option if timeout occurs.