Communication API

Complete API reference for the user interaction and communication system in multi-agent workflows.

The Communication API provides bi-directional user interaction capabilities, supporting synchronous terminal interactions, asynchronous web interfaces, and event-driven communication patterns.

Core Classes

CommunicationManager

Central manager for user communication across different channels.

Import

from marsys.coordination.communication import CommunicationManager
from marsys.coordination.config import CommunicationConfig

Constructor

CommunicationManager(
config: Optional[CommunicationConfig] = None
)

Key Methods

register_channel
def register_channel(channel: CommunicationChannel) -> None

Register a communication channel.

request_user_input
async def request_user_input(
prompt: str,
session_id: str,
channel_id: Optional[str] = None,
timeout: Optional[float] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str

Request input from user.

ParameterTypeDescriptionDefault
promptstrPrompt to displayRequired
session_idstrSession identifierRequired
channel_idstrTarget channelAuto-select
timeoutfloatInput timeoutFrom config
metadataDictAdditional metadataNone
send_message
async def send_message(
message: str,
session_id: str,
channel_id: Optional[str] = None,
message_type: str = "info"
) -> None

Send message to user.

subscribe
def subscribe(
topic: str,
callback: Callable[[Any], None]
) -> str

Subscribe to communication events.

Example

# Initialize manager
config = CommunicationConfig(
use_rich_formatting=True,
theme_name="modern"
)
manager = CommunicationManager(config)
# Request user input
response = await manager.request_user_input(
prompt="What would you like to analyze?",
session_id="session_123"
)
# Send message
await manager.send_message(
message="Analysis complete!",
session_id="session_123",
message_type="success"
)

UserNodeHandler

Handles execution when control reaches a User node in topology.

Import

from marsys.coordination.communication import UserNodeHandler

Constructor

UserNodeHandler(
communication_manager: CommunicationManager,
event_bus: Optional[EventBus] = None
)

handle_user_node

async def handle_user_node(
branch: ExecutionBranch,
incoming_message: Any,
context: Dict[str, Any]
) -> StepResult

Handle User node execution.

ParameterTypeDescriptionDefault
branchExecutionBranchCurrent execution branchRequired
incoming_messageAnyMessage from calling agentRequired
contextDictExecution contextRequired

Returns: StepResult with user response and routing decision.

Example

handler = UserNodeHandler(communication_manager)
# Handle user interaction
result = await handler.handle_user_node(
branch=current_branch,
incoming_message="Please approve the analysis results",
context={"session_id": "session_123"}
)
# Process result
if result.success:
user_response = result.data["response"]
next_agent = result.data.get("next_agent")

CommunicationChannel (Abstract)

Base class for communication channels.

Import

from marsys.coordination.communication import CommunicationChannel

Abstract Methods

MethodDescriptionReturns
send(message, metadata)Send message to userNone
receive(timeout)Receive user inputstr
is_available()Check if channel availablebool
close()Close channelNone

TerminalChannel

Basic terminal I/O channel.

Import

from marsys.coordination.communication.channels import TerminalChannel

Constructor

TerminalChannel(
channel_id: str = "terminal",
use_colors: bool = True
)

Example

channel = TerminalChannel()
# Send message
await channel.send("Enter your choice:")
# Receive input
response = await channel.receive(timeout=30.0)

EnhancedTerminalChannel

Rich terminal with advanced formatting.

Import

from marsys.coordination.communication.channels import EnhancedTerminalChannel

Constructor

EnhancedTerminalChannel(
channel_id: str = "terminal",
use_rich: bool = True,
theme_name: str = "modern",
prefix_width: int = 20,
show_timestamps: bool = True
)

Features:

  • Rich text formatting
  • Color themes
  • Progress indicators
  • Tables and panels
  • Markdown rendering

Example

channel = EnhancedTerminalChannel(
theme_name="modern",
use_rich=True
)
# Send formatted message
await channel.send({
"content": "## Analysis Results",
"format": "markdown",
"style": "success"
})

PrefixedCLIChannel

CLI channel with agent name prefixes.

Import

from marsys.coordination.communication.channels import PrefixedCLIChannel

Constructor

PrefixedCLIChannel(
channel_id: str = "cli",
prefix_width: int = 20,
show_timestamps: bool = False,
prefix_alignment: str = "left"
)

Example

channel = PrefixedCLIChannel(
prefix_width=25,
show_timestamps=True
)
# Send with prefix
await channel.send(
message="Processing data...",
metadata={"agent_name": "DataProcessor"}
)
# Output: [DataProcessor] Processing data...

Communication Modes

CommunicationMode

Enumeration of communication patterns.

Import

from marsys.coordination.communication import CommunicationMode

Values

ModeDescriptionUse Case
SYNCSynchronous blockingTerminal input
ASYNC_PUBSUBEvent-driven asyncWeb interfaces
ASYNC_QUEUEQueue-based asyncMessage systems

Mode Selection

# Sync mode (terminal)
manager = CommunicationManager(
config=CommunicationConfig(
mode=CommunicationMode.SYNC
)
)
# Async pub/sub (web)
manager = CommunicationManager(
config=CommunicationConfig(
mode=CommunicationMode.ASYNC_PUBSUB
)
)

User Interaction Patterns

Simple User Input

# In topology
topology = {
"agents": ["Agent1", "User", "Agent2"],
"flows": [
"Agent1 -> User",
"User -> Agent2"
]
}
# Agent1 response triggers user interaction
response = {
"next_action": "invoke_agent",
"action_input": "User",
"message": "Please review the results"
}
# User sees message and provides input
# System routes response to Agent2

Error Recovery

# Agent triggers error recovery
response = {
"next_action": "error_recovery",
"error_details": {
"type": "api_quota_exceeded",
"message": "OpenAI API quota exceeded"
},
"suggested_action": "retry_with_different_model"
}
# Routes to User node for intervention
# User can:
# - Retry with same settings
# - Change model
# - Skip operation
# - Abort workflow

Approval Workflow

# Configure approval interaction
interaction = UserInteraction(
id="approval_123",
prompt="Approve deployment to production?",
options=["approve", "reject", "review"],
metadata={
"changes": ["Update API", "Database migration"],
"risk_level": "medium"
}
)
# Request approval
response = await manager.request_structured_input(interaction)
if response == "approve":
# Continue with deployment
pass
elif response == "review":
# Show detailed changes
pass

Event System

EventBus

Event bus for communication events.

Import

from marsys.coordination.event_bus import EventBus

Methods

emit
async def emit(event: Any) -> None

Emit an event object to subscribers (async).

emit_nowait
def emit_nowait(event: Any) -> None

Emit an event from synchronous contexts (fire-and-forget).

subscribe
def subscribe(event_type: str, listener: Callable) -> None

Subscribe to events.

unsubscribe
def unsubscribe(event_type: str, listener: Callable) -> None

Unsubscribe from events.

Event Types

Event types are the class names of the emitted event objects (e.g., "PlanCreatedEvent").

Example

from dataclasses import dataclass
@dataclass
class CustomEvent:
session_id: str
payload: str
bus = EventBus()
async def on_custom_event(event: CustomEvent):
print(event.payload)
bus.subscribe("CustomEvent", on_custom_event)
# Emit events
await bus.emit(CustomEvent(session_id="123", payload="hello"))
bus.emit_nowait(CustomEvent(session_id="123", payload="background"))

WebChannel Status Events

WebChannel can deliver status and planning events to web clients via WebSocket push or REST polling.

Methods

push_status_event
async def push_status_event(event_data: Dict[str, Any]) -> None

Push a serialized status event to WebSocket clients and the polling queue.

subscribe_status_events
def subscribe_status_events(callback: Callable[[Dict[str, Any]], None]) -> None

Subscribe to status event callbacks.

get_status_events
async def get_status_events(limit: int = 100, timeout: float = 0.0) -> List[Dict[str, Any]]

Poll for status events (useful for REST APIs).

StatusWebChannel Bridge

This forwards StatusManager events (including planning events) into WebChannel.

from marsys.coordination.communication.channels import WebChannel
from marsys.coordination.status.channels import StatusWebChannel
web_channel = WebChannel()
status_web = StatusWebChannel(web_channel)
status_manager.add_channel(status_web)

Custom Channels

Creating Custom Channel

from marsys.coordination.communication import CommunicationChannel
class WebSocketChannel(CommunicationChannel):
"""WebSocket-based communication channel."""
def __init__(self, ws_url: str):
super().__init__(channel_id="websocket")
self.ws_url = ws_url
self.ws = None
async def connect(self):
self.ws = await websocket.connect(self.ws_url)
async def send(self, message: str, metadata: Dict = None):
await self.ws.send(json.dumps({
"message": message,
"metadata": metadata
}))
async def receive(self, timeout: float = None):
try:
data = await asyncio.wait_for(
self.ws.receive(),
timeout=timeout
)
return json.loads(data)["response"]
except asyncio.TimeoutError:
return None
def is_available(self) -> bool:
return self.ws and not self.ws.closed
async def close(self):
if self.ws:
await self.ws.close()

Themes and Formatting

Available Themes

# Modern theme (default)
config = CommunicationConfig(theme_name="modern")
# Classic terminal
config = CommunicationConfig(theme_name="classic")
# Minimal
config = CommunicationConfig(theme_name="minimal")
# Custom theme
config = CommunicationConfig(
theme_name="custom",
custom_theme={
"primary": "#007ACC",
"success": "#4CAF50",
"error": "#F44336",
"warning": "#FF9800"
}
)

Message Formatting

# Rich formatting
await channel.send({
"content": "# Results\n- Item 1\n- Item 2",
"format": "markdown",
"style": "panel",
"title": "Analysis"
})
# Table display
await channel.send({
"content": [
["Metric", "Value"],
["Accuracy", "95%"],
["Speed", "1.2s"]
],
"format": "table"
})

Best Practices

Do

  • Set appropriate timeouts for user input
  • Provide clear prompts and options
  • Handle timeout gracefully
  • Store interaction history
  • Use structured interactions for complex inputs

Don't

  • Block indefinitely on user input
  • Mix communication channels in same session
  • Ignore channel availability
  • Send sensitive data unencrypted
  • Assume user always provides valid input

Pro Tip

Use EnhancedTerminalChannel for better user experience with colors, formatting, and progress indicators. It automatically falls back to basic terminal if Rich is unavailable.

Timeout Handling

Always set reasonable timeouts for user input to prevent indefinite blocking. Consider offering a retry option if timeout occurs.