Communication API
Complete API reference for the user interaction and communication system in multi-agent workflows.
The Communication API provides bi-directional user interaction capabilities, supporting synchronous terminal interactions, asynchronous web interfaces, and event-driven communication patterns.
Core Classes
CommunicationManager
Central manager for user communication across different channels.
Import
from marsys.coordination.communication import CommunicationManagerfrom marsys.coordination.config import CommunicationConfig
Constructor
CommunicationManager(config: Optional[CommunicationConfig] = None)
Key Methods
register_channel
def register_channel(channel: CommunicationChannel) -> None
Register a communication channel.
request_user_input
async def request_user_input(prompt: str,session_id: str,channel_id: Optional[str] = None,timeout: Optional[float] = None,metadata: Optional[Dict[str, Any]] = None) -> str
Request input from user.
| Parameter | Type | Description | Default |
|---|---|---|---|
| prompt | str | Prompt to display | Required |
| session_id | str | Session identifier | Required |
| channel_id | str | Target channel | Auto-select |
| timeout | float | Input timeout | From config |
| metadata | Dict | Additional metadata | None |
send_message
async def send_message(message: str,session_id: str,channel_id: Optional[str] = None,message_type: str = "info") -> None
Send message to user.
subscribe
def subscribe(topic: str,callback: Callable[[Any], None]) -> str
Subscribe to communication events.
Example
# Initialize managerconfig = CommunicationConfig(use_rich_formatting=True,theme_name="modern")manager = CommunicationManager(config)# Request user inputresponse = await manager.request_user_input(prompt="What would you like to analyze?",session_id="session_123")# Send messageawait manager.send_message(message="Analysis complete!",session_id="session_123",message_type="success")
UserNodeHandler
Handles execution when control reaches a User node in topology.
Import
from marsys.coordination.communication import UserNodeHandler
Constructor
UserNodeHandler(communication_manager: CommunicationManager,event_bus: Optional[EventBus] = None)
handle_user_node
async def handle_user_node(branch: ExecutionBranch,incoming_message: Any,context: Dict[str, Any]) -> StepResult
Handle User node execution.
| Parameter | Type | Description | Default |
|---|---|---|---|
| branch | ExecutionBranch | Current execution branch | Required |
| incoming_message | Any | Message from calling agent | Required |
| context | Dict | Execution context | Required |
Returns: StepResult with user response and routing decision.
Example
handler = UserNodeHandler(communication_manager)# Handle user interactionresult = await handler.handle_user_node(branch=current_branch,incoming_message="Please approve the analysis results",context={"session_id": "session_123"})# Process resultif result.success:user_response = result.data["response"]next_agent = result.data.get("next_agent")
CommunicationChannel (Abstract)
Base class for communication channels.
Import
from marsys.coordination.communication import CommunicationChannel
Abstract Methods
| Method | Description | Returns |
|---|---|---|
| send(message, metadata) | Send message to user | None |
| receive(timeout) | Receive user input | str |
| is_available() | Check if channel available | bool |
| close() | Close channel | None |
TerminalChannel
Basic terminal I/O channel.
Import
from marsys.coordination.communication.channels import TerminalChannel
Constructor
TerminalChannel(channel_id: str = "terminal",use_colors: bool = True)
Example
channel = TerminalChannel()# Send messageawait channel.send("Enter your choice:")# Receive inputresponse = await channel.receive(timeout=30.0)
EnhancedTerminalChannel
Rich terminal with advanced formatting.
Import
from marsys.coordination.communication.channels import EnhancedTerminalChannel
Constructor
EnhancedTerminalChannel(channel_id: str = "terminal",use_rich: bool = True,theme_name: str = "modern",prefix_width: int = 20,show_timestamps: bool = True)
Features:
- Rich text formatting
- Color themes
- Progress indicators
- Tables and panels
- Markdown rendering
Example
channel = EnhancedTerminalChannel(theme_name="modern",use_rich=True)# Send formatted messageawait channel.send({"content": "## Analysis Results","format": "markdown","style": "success"})
PrefixedCLIChannel
CLI channel with agent name prefixes.
Import
from marsys.coordination.communication.channels import PrefixedCLIChannel
Constructor
PrefixedCLIChannel(channel_id: str = "cli",prefix_width: int = 20,show_timestamps: bool = False,prefix_alignment: str = "left")
Example
channel = PrefixedCLIChannel(prefix_width=25,show_timestamps=True)# Send with prefixawait channel.send(message="Processing data...",metadata={"agent_name": "DataProcessor"})# Output: [DataProcessor] Processing data...
Communication Modes
CommunicationMode
Enumeration of communication patterns.
Import
from marsys.coordination.communication import CommunicationMode
Values
| Mode | Description | Use Case |
|---|---|---|
| SYNC | Synchronous blocking | Terminal input |
| ASYNC_PUBSUB | Event-driven async | Web interfaces |
| ASYNC_QUEUE | Queue-based async | Message systems |
Mode Selection
# Sync mode (terminal)manager = CommunicationManager(config=CommunicationConfig(mode=CommunicationMode.SYNC))# Async pub/sub (web)manager = CommunicationManager(config=CommunicationConfig(mode=CommunicationMode.ASYNC_PUBSUB))
User Interaction Patterns
Simple User Input
# In topologytopology = {"agents": ["Agent1", "User", "Agent2"],"flows": ["Agent1 -> User","User -> Agent2"]}# Agent1 response triggers user interactionresponse = {"next_action": "invoke_agent","action_input": "User","message": "Please review the results"}# User sees message and provides input# System routes response to Agent2
Error Recovery
# Agent triggers error recoveryresponse = {"next_action": "error_recovery","error_details": {"type": "api_quota_exceeded","message": "OpenAI API quota exceeded"},"suggested_action": "retry_with_different_model"}# Routes to User node for intervention# User can:# - Retry with same settings# - Change model# - Skip operation# - Abort workflow
Approval Workflow
# Configure approval interactioninteraction = UserInteraction(id="approval_123",prompt="Approve deployment to production?",options=["approve", "reject", "review"],metadata={"changes": ["Update API", "Database migration"],"risk_level": "medium"})# Request approvalresponse = await manager.request_structured_input(interaction)if response == "approve":# Continue with deploymentpasselif response == "review":# Show detailed changespass
Event System
EventBus
Event bus for communication events.
Import
from marsys.coordination.event_bus import EventBus
Methods
emit
async def emit(event: Any) -> None
Emit an event object to subscribers (async).
emit_nowait
def emit_nowait(event: Any) -> None
Emit an event from synchronous contexts (fire-and-forget).
subscribe
def subscribe(event_type: str, listener: Callable) -> None
Subscribe to events.
unsubscribe
def unsubscribe(event_type: str, listener: Callable) -> None
Unsubscribe from events.
Event Types
Event types are the class names of the emitted event objects (e.g., "PlanCreatedEvent").
Example
from dataclasses import dataclass@dataclassclass CustomEvent:session_id: strpayload: strbus = EventBus()async def on_custom_event(event: CustomEvent):print(event.payload)bus.subscribe("CustomEvent", on_custom_event)# Emit eventsawait bus.emit(CustomEvent(session_id="123", payload="hello"))bus.emit_nowait(CustomEvent(session_id="123", payload="background"))
WebChannel Status Events
WebChannel can deliver status and planning events to web clients via WebSocket push or REST polling.
Methods
push_status_event
async def push_status_event(event_data: Dict[str, Any]) -> None
Push a serialized status event to WebSocket clients and the polling queue.
subscribe_status_events
def subscribe_status_events(callback: Callable[[Dict[str, Any]], None]) -> None
Subscribe to status event callbacks.
get_status_events
async def get_status_events(limit: int = 100, timeout: float = 0.0) -> List[Dict[str, Any]]
Poll for status events (useful for REST APIs).
StatusWebChannel Bridge
This forwards StatusManager events (including planning events) into WebChannel.
from marsys.coordination.communication.channels import WebChannelfrom marsys.coordination.status.channels import StatusWebChannelweb_channel = WebChannel()status_web = StatusWebChannel(web_channel)status_manager.add_channel(status_web)
Custom Channels
Creating Custom Channel
from marsys.coordination.communication import CommunicationChannelclass WebSocketChannel(CommunicationChannel):"""WebSocket-based communication channel."""def __init__(self, ws_url: str):super().__init__(channel_id="websocket")self.ws_url = ws_urlself.ws = Noneasync def connect(self):self.ws = await websocket.connect(self.ws_url)async def send(self, message: str, metadata: Dict = None):await self.ws.send(json.dumps({"message": message,"metadata": metadata}))async def receive(self, timeout: float = None):try:data = await asyncio.wait_for(self.ws.receive(),timeout=timeout)return json.loads(data)["response"]except asyncio.TimeoutError:return Nonedef is_available(self) -> bool:return self.ws and not self.ws.closedasync def close(self):if self.ws:await self.ws.close()
Themes and Formatting
Available Themes
# Modern theme (default)config = CommunicationConfig(theme_name="modern")# Classic terminalconfig = CommunicationConfig(theme_name="classic")# Minimalconfig = CommunicationConfig(theme_name="minimal")# Custom themeconfig = CommunicationConfig(theme_name="custom",custom_theme={"primary": "#007ACC","success": "#4CAF50","error": "#F44336","warning": "#FF9800"})
Message Formatting
# Rich formattingawait channel.send({"content": "# Results\n- Item 1\n- Item 2","format": "markdown","style": "panel","title": "Analysis"})# Table displayawait channel.send({"content": [["Metric", "Value"],["Accuracy", "95%"],["Speed", "1.2s"]],"format": "table"})
Best Practices
Do
- Set appropriate timeouts for user input
- Provide clear prompts and options
- Handle timeout gracefully
- Store interaction history
- Use structured interactions for complex inputs
Don't
- Block indefinitely on user input
- Mix communication channels in same session
- Ignore channel availability
- Send sensitive data unencrypted
- Assume user always provides valid input
Related Documentation
- Configuration API - Communication configuration
- Execution API - Integration with execution system
Pro Tip
Use EnhancedTerminalChannel for better user experience with colors, formatting, and progress indicators. It automatically falls back to basic terminal if Rich is unavailable.
Timeout Handling
Always set reasonable timeouts for user input to prevent indefinite blocking. Consider offering a retry option if timeout occurs.