Communication API

Complete API reference for the user interaction and communication system in multi-agent workflows.

See Also

For communication patterns, see the Communication Concepts guide.

CommunicationManager

Central manager for user communication across different channels.

Import

from marsys.coordination.communication import CommunicationManager
from marsys.coordination.config import CommunicationConfig

Constructor

CommunicationManager(
config: Optional[CommunicationConfig] = None
)

Key Methods

register_channel

def register_channel(channel: CommunicationChannel) -> None

Register a communication channel.

request_user_input

async def request_user_input(
prompt: str,
session_id: str,
channel_id: Optional[str] = None,
timeout: Optional[float] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str

send_message

async def send_message(
message: str,
session_id: str,
channel_id: Optional[str] = None,
message_type: str = "info"
) -> None

Example

# Initialize manager
config = CommunicationConfig(
use_rich_formatting=True,
theme_name="modern"
)
manager = CommunicationManager(config)
# Request user input
response = await manager.request_user_input(
prompt="What would you like to analyze?",
session_id="session_123"
)
# Send message
await manager.send_message(
message="Analysis complete!",
session_id="session_123",
message_type="success"
)

UserNodeHandler

Handles execution when control reaches a User node in topology.

Import

from marsys.coordination.communication import UserNodeHandler

handle_user_node

async def handle_user_node(
branch: ExecutionBranch,
incoming_message: Any,
context: Dict[str, Any]
) -> StepResult

Example

handler = UserNodeHandler(communication_manager)
# Handle user interaction
result = await handler.handle_user_node(
branch=current_branch,
incoming_message="Please approve the analysis results",
context={"session_id": "session_123"}
)
# Process result
if result.success:
user_response = result.data["response"]
next_agent = result.data.get("next_agent")

Communication Channels

TerminalChannel

Basic terminal I/O channel.

from marsys.coordination.communication.channels import TerminalChannel
channel = TerminalChannel()
# Send message
await channel.send("Enter your choice:")
# Receive input
response = await channel.receive(timeout=30.0)

EnhancedTerminalChannel

Rich terminal with advanced formatting, colors, and progress indicators.

from marsys.coordination.communication.channels import EnhancedTerminalChannel
channel = EnhancedTerminalChannel(
theme_name="modern",
use_rich=True
)
# Send formatted message
await channel.send({
"content": "## Analysis Results",
"format": "markdown",
"style": "success"
})

PrefixedCLIChannel

CLI channel with agent name prefixes.

from marsys.coordination.communication.channels import PrefixedCLIChannel
channel = PrefixedCLIChannel(
prefix_width=25,
show_timestamps=True
)
# Send with prefix
await channel.send(
message="Processing data...",
metadata={"agent_name": "DataProcessor"}
)
# Output: [DataProcessor] Processing data...

CommunicationMode

Enumeration of communication patterns.

ModeDescriptionUse Case
SYNCSynchronous blockingTerminal input
ASYNC_PUBSUBEvent-driven asyncWeb interfaces
ASYNC_QUEUEQueue-based asyncMessage systems

User Interaction Patterns

Simple User Input

# In topology
topology = {
"agents": ["Agent1", "User", "Agent2"],
"flows": [
"Agent1 -> User",
"User -> Agent2"
]
}
# Agent1 response triggers user interaction
response = {
"next_action": "invoke_agent",
"action_input": "User",
"message": "Please review the results"
}
# User sees message and provides input
# System routes response to Agent2

Error Recovery

# Agent triggers error recovery
response = {
"next_action": "error_recovery",
"error_details": {
"type": "api_quota_exceeded",
"message": "API quota exceeded"
},
"suggested_action": "retry_with_different_model"
}
# Routes to User node for intervention
# User can:
# - Retry with same settings
# - Change model
# - Skip operation
# - Abort workflow

Approval Workflow

# Configure approval interaction
interaction = UserInteraction(
id="approval_123",
prompt="Approve deployment to production?",
options=["approve", "reject", "review"],
metadata={
"changes": ["Update API", "Database migration"],
"risk_level": "medium"
}
)
# Request approval
response = await manager.request_structured_input(interaction)
if response == "approve":
# Continue with deployment
pass
elif response == "review":
# Show detailed changes
pass

EventBus

Event bus for communication events.

Events

  • user.input.requested - Input requested
  • user.input.received - Input received
  • user.message.sent - Message sent
  • channel.connected - Channel connected
  • channel.disconnected - Channel disconnected

Example

from marsys.coordination.communication import EventBus
bus = EventBus()
# Subscribe to events
def on_input(data):
print(f"User input: {data['response']}")
bus.subscribe("user.input.received", on_input)
# Publish event
await bus.publish(
"user.input.received",
{"response": "yes", "session_id": "123"}
)

Custom Channels

from marsys.coordination.communication import CommunicationChannel
class WebSocketChannel(CommunicationChannel):
"""WebSocket-based communication channel."""
def __init__(self, ws_url: str):
super().__init__(channel_id="websocket")
self.ws_url = ws_url
self.ws = None
async def connect(self):
self.ws = await websocket.connect(self.ws_url)
async def send(self, message: str, metadata: Dict = None):
await self.ws.send(json.dumps({
"message": message,
"metadata": metadata
}))
async def receive(self, timeout: float = None):
try:
data = await asyncio.wait_for(
self.ws.receive(),
timeout=timeout
)
return json.loads(data)["response"]
except asyncio.TimeoutError:
return None
def is_available(self) -> bool:
return self.ws and not self.ws.closed
async def close(self):
if self.ws:
await self.ws.close()

Best Practices

Do

  • Set appropriate timeouts for user input
  • Provide clear prompts and options
  • Handle timeout gracefully
  • Store interaction history

Don't

  • Block indefinitely on user input
  • Mix communication channels in same session
  • Ignore channel availability
  • Send sensitive data unencrypted

Pro Tip

Use EnhancedTerminalChannel for better user experience with colors, formatting, and progress indicators. It automatically falls back to basic terminal if Rich is unavailable.

Timeout Handling

Always set reasonable timeouts for user input to prevent indefinite blocking. Consider offering a retry option if timeout occurs.