Communication
The communication system in MARSYS enables agents to coordinate, collaborate, and interact with users through structured messaging and event-driven patterns.
Overview
MARSYS communication provides:
- User Interaction: Human-in-the-loop capabilities with multiple channels
- Agent Coordination: Inter-agent messaging and task delegation
- Event System: Pub/sub for decoupled communication
- Status Updates: Real-time progress and status reporting
- Error Routing: Intelligent error handling and recovery
Architecture
The communication system consists of several interconnected components:
- Communication Layer: CommunicationManager (Central Hub), UserNodeHandler (User Interaction), StatusManager (Status Updates), EventBus (Event System)
- Channels: TerminalChannel (CLI Interface), EnhancedTerminal (Rich Terminal), PrefixedCLI (Agent Prefixes), WebChannel (Web Interface)
- Message Flow: User to UserNodeHandler to Agents, Agent-to-Agent communication, Agents to StatusManager back to User
Core Components
CommunicationManager
Central hub for all communication:
from marsys.coordination.communication import CommunicationManagerfrom marsys.coordination.config import CommunicationConfig# Configure communicationcomm_config = CommunicationConfig(use_rich_formatting=True,theme_name="modern",prefix_width=20,show_timestamps=True,enable_history=True,use_colors=True)# Initialize managercomm_manager = CommunicationManager(config=comm_config)# Register channelscomm_manager.register_channel("terminal", TerminalChannel())comm_manager.register_channel("web", WebChannel())# CommunicationManager is used with Orchestra for user interaction# Direct send/receive is handled through channels internally# Use with Orchestraresult = await Orchestra.run(task="Help me with a task",topology=topology,execution_config=ExecutionConfig(user_interaction="terminal" # Auto-creates CommunicationManager))
UserNodeHandler
Manages User node execution in topology:
from marsys.coordination.communication import UserNodeHandlerclass UserNodeHandler:"""Handles User node interactions in workflow."""async def handle_user_node(self,branch: ExecutionBranch,incoming_message: Any,context: Dict[str, Any]) -> StepResult:"""Process User node interaction."""# Format message for userformatted = self._format_for_user(incoming_message)# Get user responseuser_response = await self._get_user_input(prompt=formatted,context=context)# Process responsereturn StepResult(success=True,result=user_response,metadata={"source": "user", "timestamp": datetime.now()})async def _get_user_input(self,prompt: str,context: Dict[str, Any]) -> str:"""Get input from user with proper formatting."""# Show context if availableif context.get("show_context"):await self._display_context(context)# Get input based on modeif self.mode == "sync":return await self._sync_input(prompt)elif self.mode == "async":return await self._async_input(prompt)
StatusManager
Real-time status updates:
from marsys.coordination.communication import StatusManagerfrom marsys.coordination.config import StatusConfigclass StatusManager:"""Manages status updates and progress reporting."""def __init__(self, config: StatusConfig):self.config = configself.aggregator = MessageAggregator(window_ms=config.aggregation_window_ms)async def update_status(self,agent_name: str,status: str,progress: Optional[float] = None,metadata: Optional[Dict] = None):"""Send status update."""if not self.config.enabled:return# Create status messagemessage = StatusMessage(agent=agent_name,status=status,progress=progress,timestamp=datetime.now(),metadata=metadata or {})# Aggregate if configuredif self.config.aggregate_parallel:self.aggregator.add(message)if self.aggregator.should_flush():await self._flush_aggregated()else:await self._send_immediate(message)def format_status(self, message: StatusMessage) -> str:"""Format status message for display."""if self.config.show_agent_prefixes:prefix = f"[{message.agent:>{self.config.prefix_width}}]"else:prefix = ""if message.progress is not None:progress_bar = self._create_progress_bar(message.progress)return f"{prefix} {message.status} {progress_bar}"else:return f"{prefix} {message.status}"
Communication Patterns
User Interaction Pattern
Enable human-in-the-loop workflows:
# Topology with User nodetopology = {"agents": ["User", "Assistant", "Reviewer"],"flows": ["User -> Assistant", # User provides input"Assistant -> Reviewer", # Assistant processes"Reviewer -> User" # User reviews result]}# Execution with user interactionresult = await Orchestra.run(task="Help me write a report",topology=topology,execution_config=ExecutionConfig(user_interaction="terminal",user_first=True, # Start with userinitial_user_msg="Welcome! What report would you like to write?"))
Event-Driven Communication
Decoupled pub/sub messaging:
from dataclasses import dataclassfrom marsys.coordination.event_bus import EventBus@dataclassclass CustomEvent:session_id: strpayload: str# Usageevent_bus = EventBus()# Subscribe to events by class nameasync def on_custom_event(event: CustomEvent):print(f"Event payload: {event.payload}")event_bus.subscribe("CustomEvent", on_custom_event)# Emit events (async)await event_bus.emit(CustomEvent(session_id="123", payload="hello"))# Emit from sync contexts (fire-and-forget)event_bus.emit_nowait(CustomEvent(session_id="123", payload="background"))
Web Status Streaming
Forward status and planning events to web clients via WebChannel:
from marsys.coordination.communication.channels import WebChannelfrom marsys.coordination.status.channels import StatusWebChannelweb_channel = WebChannel()status_web = StatusWebChannel(web_channel)status_manager.add_channel(status_web)
Web clients can receive updates via WebSocket or poll WebChannel.get_status_events().
Status Aggregation Pattern
Aggregate parallel status updates:
class MessageAggregator:"""Aggregate status messages for cleaner output."""def __init__(self, window_ms: int = 500):self.window_ms = window_msself.messages: List[StatusMessage] = []self.last_flush = time.time() * 1000def add(self, message: StatusMessage):"""Add message to aggregation window."""self.messages.append(message)def should_flush(self) -> bool:"""Check if window expired."""now = time.time() * 1000return (now - self.last_flush) >= self.window_msdef flush(self) -> List[StatusMessage]:"""Get and clear aggregated messages."""messages = self.messages.copy()self.messages.clear()self.last_flush = time.time() * 1000return messages# Usage in parallel executionaggregator = MessageAggregator(window_ms=500)# Multiple agents updating in parallelfor agent in parallel_agents:message = StatusMessage(agent=agent.name, status="Processing...")aggregator.add(message)if aggregator.should_flush():# Display all updates togethermessages = aggregator.flush()display_aggregated(messages)
Communication Channels
TerminalChannel
Basic terminal I/O:
class TerminalChannel(Channel):"""Basic terminal communication channel."""async def send(self, message: str, **kwargs):"""Send message to terminal."""print(message)async def receive(self, prompt: str = "", **kwargs) -> str:"""Receive input from terminal."""return input(prompt)def format_message(self, message: str, metadata: Dict) -> str:"""Format message for terminal display."""if metadata.get("error"):return f"ERROR: {message}"elif metadata.get("warning"):return f"WARNING: {message}"else:return message
EnhancedTerminalChannel
Rich terminal with colors and formatting:
from rich.console import Consolefrom rich.prompt import Promptfrom rich.progress import Progressclass EnhancedTerminalChannel(Channel):"""Enhanced terminal with Rich formatting."""def __init__(self):self.console = Console()self.progress = Progress()async def send(self, message: str, **kwargs):"""Send formatted message."""style = kwargs.get("style", "default")if kwargs.get("is_error"):self.console.print(f"[red]✗[/red] {message}")elif kwargs.get("is_success"):self.console.print(f"[green]✓[/green] {message}")else:self.console.print(message, style=style)async def receive(self, prompt: str = "", **kwargs) -> str:"""Get input with enhanced prompt."""choices = kwargs.get("choices")if choices:return Prompt.ask(prompt, choices=choices)else:return Prompt.ask(prompt)def display_progress(self, task_id: str, total: int):"""Show progress bar."""return self.progress.add_task(task_id, total=total)
PrefixedCLIChannel
Agent-prefixed output:
class PrefixedCLIChannel(Channel):"""Terminal channel with agent prefixes."""def __init__(self, prefix_width: int = 20):self.prefix_width = prefix_widthself.colors = {"Coordinator": "blue","Worker": "green","User": "yellow","Error": "red"}def format_with_prefix(self,agent_name: str,message: str) -> str:"""Format message with agent prefix."""# Create colored prefixcolor = self.colors.get(agent_name, "white")prefix = f"[{agent_name:>{self.prefix_width}}]"if self.use_colors:from colorama import Fore, Stylecolor_code = getattr(Fore, color.upper(), Fore.WHITE)prefix = f"{color_code}{prefix}{Style.RESET_ALL}"# Split multi-line messageslines = message.split('\n')formatted_lines = []for i, line in enumerate(lines):if i == 0:formatted_lines.append(f"{prefix} {line}")else:# Indent continuation linesindent = " " * (self.prefix_width + 3)formatted_lines.append(f"{indent}{line}")return '\n'.join(formatted_lines)
Configuration
CommunicationConfig
from marsys.coordination.config import CommunicationConfigconfig = CommunicationConfig(# Formattinguse_rich_formatting=True,theme_name="modern", # modern, classic, minimalprefix_width=20,prefix_alignment="right", # left, center, right# Displayshow_timestamps=True,timestamp_format="%H:%M:%S",use_colors=True,color_depth="truecolor", # truecolor, 256, 16, none# Historyenable_history=True,history_size=1000,persist_history=False,history_file=".marsys_history",# Inputenable_tab_completion=True,input_timeout=None, # Seconds or None# Channelschannels=["terminal"], # Active channelsdefault_channel="terminal",# Error handlingfallback_on_error=True,use_enhanced_terminal=True)
Advanced Patterns
Multi-Channel Broadcasting
Send to multiple channels simultaneously:
class MultiChannelManager:"""Broadcast to multiple channels."""def __init__(self):self.channels: Dict[str, Channel] = {}def register(self, name: str, channel: Channel):"""Register communication channel."""self.channels[name] = channelasync def broadcast(self,message: str,channels: Optional[List[str]] = None,**kwargs):"""Broadcast message to channels."""target_channels = channels or list(self.channels.keys())tasks = []for channel_name in target_channels:if channel_name in self.channels:channel = self.channels[channel_name]task = channel.send(message, **kwargs)tasks.append(task)await asyncio.gather(*tasks, return_exceptions=True)# Usagemanager = MultiChannelManager()manager.register("terminal", TerminalChannel())manager.register("web", WebChannel())manager.register("log", LogChannel())# Broadcast to allawait manager.broadcast("System starting...")# Broadcast to specific channelsawait manager.broadcast("Error occurred",channels=["terminal", "log"],is_error=True)
Interactive Prompts
Get structured input from users:
class InteractivePrompt:"""Interactive user prompts."""async def choice(self,question: str,options: List[str],default: Optional[str] = None) -> str:"""Present multiple choice."""print(f"\n{question}")for i, option in enumerate(options, 1):print(f" {i}. {option}")while True:response = input(f"Choice [1-{len(options)}]: ")try:index = int(response) - 1if 0 <= index < len(options):return options[index]except (ValueError, IndexError):passprint("Invalid choice. Please try again.")async def confirm(self,question: str,default: bool = False) -> bool:"""Get yes/no confirmation."""default_str = "Y/n" if default else "y/N"response = input(f"{question} [{default_str}]: ").lower()if not response:return defaultreturn response in ['y', 'yes', 'true', '1']async def multi_input(self,prompt: str,min_items: int = 1,max_items: Optional[int] = None) -> List[str]:"""Get multiple inputs."""print(prompt)print("Enter items (empty line to finish):")items = []while True:item = input(f" [{len(items) + 1}] ")if not item:if len(items) >= min_items:breakprint(f"Need at least {min_items} items")else:items.append(item)if max_items and len(items) >= max_items:breakreturn items
Error Recovery Communication
Route errors to User node for recovery:
class ErrorRoutingHandler:"""Route errors to User for recovery."""async def handle_error(self,error: Exception,context: Dict[str, Any],topology: Topology) -> Optional[str]:"""Route error to User node if available."""# Check if User node exists in topologyif not topology.has_node("User"):return None# Format error for usererror_msg = self._format_error(error, context)# Create error recovery messagerecovery_prompt = f"""An error occurred during execution:{error_msg}How would you like to proceed?1. Retry the operation2. Skip and continue3. Provide alternative input4. Abort executionYour choice: """# Get user decisionuser_response = await self._get_user_input(recovery_prompt)# Process decisionreturn self._process_recovery_decision(user_response, context)def _format_error(self, error: Exception, context: Dict) -> str:"""Format error for user display."""agent = context.get("agent", "Unknown")operation = context.get("operation", "Unknown")return f"""Agent: {agent}Operation: {operation}Error Type: {type(error).__name__}Details: {str(error)}"""
Next Steps
User Node Guide
Complete guide to User node integration
Event System
Event-driven communication patterns
Status Management
Status updates and progress tracking
Messages
Message types and patterns
Communication System Ready!
You now understand the communication system in MARSYS. Effective communication enables seamless coordination between agents and intuitive user interaction.