Registry
The AgentRegistry is a central service discovery and lifecycle management system that enables dynamic agent registration, discovery, and communication in MARSYS workflows.
Overview
The registry system provides:
- Service Discovery: Find agents by name, capability, or type
- Lifecycle Management: Automatic registration and cleanup
- Thread-Safe Operations: Concurrent access with locking
- Weak References: Automatic garbage collection
- Dynamic Communication: Runtime agent discovery
Architecture
The registry system is composed of three parts:
- AgentRegistry: The central singleton registry that stores all active agents
- Weak References: Ensures automatic cleanup when agents are garbage-collected
- Thread Lock: Provides safe concurrent access from multiple threads
Agent lifecycle flows from creation (auto-registered) to active use in the registry, and finally garbage collection triggers automatic removal.
Discovery supports lookup by name, by type, by capability, or listing all agents.
Core Registry
AgentRegistry Class
from marsys.agents.registry import AgentRegistryfrom marsys.agents import Agentfrom marsys.models import ModelConfig# The registry is a singleton - no instantiation needed# It's automatically used by all agents# Create agents - they auto-registeragent1 = Agent(name="data_processor",model_config=ModelConfig(type="api",provider="openrouter",name="anthropic/claude-opus-4.6",max_tokens=12000),goal="Processes and analyzes data",instruction="Analyze structured inputs and return concise findings.")agent2 = Agent(name="report_writer",model_config=config,goal="Creates detailed reports",instruction="Turn analyzed data into clear, actionable reports.")# Check registrationprint(AgentRegistry.all()) # ['data_processor', 'report_writer']# Get specific agentprocessor = AgentRegistry.get("data_processor")if processor:result = await processor.run("Analyze this data: ...")
Automatic Registration
from marsys.agents import Agentfrom marsys.agents.registry import AgentRegistryagent = Agent(model_config=config,name="assistant",goal="Help users with general tasks",instruction="Respond clearly and safely.")# Auto-registration happens in BaseAgent.__init__assert AgentRegistry.get("assistant") is agent
Registry Operations
# Get agent by nameagent = AgentRegistry.get("assistant")# Check if agent existsif AgentRegistry.get("assistant") is not None:print("Agent is available")# List all agentsall_agents = AgentRegistry.all()print(f"Active agents: {all_agents}")# List all agents including poolsall_with_pools = AgentRegistry.all_with_pools()print(f"All registrations: {all_with_pools}")# Check if name is a poolif AgentRegistry.is_pool("BrowserPool"):pool = AgentRegistry.get_pool("BrowserPool")# Count active agentscount = len(AgentRegistry.all())print(f"Total agents: {count}")# Unregister agent (identity-safe)AgentRegistry.unregister_if_same("assistant", agent_instance)
Identity-Safe Unregistration
To prevent race conditions during concurrent execution, use identity-safe unregistration methods:
from marsys.agents.registry import AgentRegistry# Create agentagent = Agent(name="worker",model_config=config,goal="Process assigned tasks",instruction="Execute assigned work and return concise results.")# Later: cleanup and unregister (identity-safe)await agent.cleanup() # Close resourcesAgentRegistry.unregister_if_same("worker", agent) # Only unregister if same instance# Or using convenience methodAgentRegistry.unregister_instance(agent) # Uses agent.name attribute
Why Identity-Safe?
In concurrent workflows, an agent's __del__ destructor might fire after a new agent with the same name is registered. Identity-safe methods use Python's is operator to verify the registry entry points to the exact same instance before unregistering.
# Problem scenario (solved by identity-safe unregistration):# 1. Task 1 creates Agent "Coordinator" (instance A)# 2. Task 1 completes, A is queued for garbage collection# 3. Task 2 creates Agent "Coordinator" (instance B, registers successfully)# 4. Task 1's instance A gets garbage collected, __del__ fires# 5. OLD: A.__del__ calls unregister("Coordinator") -> removes B!# 6. NEW: A.__del__ calls unregister_if_same("Coordinator", A) -> sees B, skips
Methods:
unregister_if_same(name, instance): Unregister only if registry entry matches instanceunregister_instance(instance): Convenience method usinginstance.nameunregister(name): Legacy method (not identity-safe, avoid in concurrent contexts)
Framework Integration
Orchestra automatically uses identity-safe unregistration when auto_cleanup_agents=True (default), preventing race conditions in multi-task workflows.
Discovery Patterns
Service Discovery
class CapabilityRegistry:"""Extended registry with capability tracking."""_capabilities: Dict[str, Set[str]] = {}@classmethoddef register_capability(cls, agent_name: str, capability: str):"""Register agent capability."""if capability not in cls._capabilities:cls._capabilities[capability] = set()cls._capabilities[capability].add(agent_name)@classmethoddef find_by_capability(cls, capability: str) -> List[str]:"""Find agents with specific capability."""return list(cls._capabilities.get(capability, []))@classmethoddef find_best_match(cls, capabilities: List[str]) -> Optional[str]:"""Find agent with most matching capabilities."""scores = {}for cap in capabilities:for agent in cls._capabilities.get(cap, []):scores[agent] = scores.get(agent, 0) + 1if scores:return max(scores, key=scores.get)return None# UsageCapabilityRegistry.register_capability("translator_1", "translation")CapabilityRegistry.register_capability("translator_1", "localization")CapabilityRegistry.register_capability("writer_1", "content_creation")# Find specialiststranslators = CapabilityRegistry.find_by_capability("translation")best_match = CapabilityRegistry.find_best_match(["translation", "localization"])
Dynamic Agent Selection
class SmartCoordinator(Agent):"""Coordinator that dynamically selects agents."""async def delegate_task(self, task: str, task_type: str, context):"""Delegate task to appropriate agent."""# Find suitable agentscandidates = self._find_suitable_agents(task_type)if not candidates:return Message(role="error",content=f"No agents available for {task_type}",name=self.name)# Select best agent (could use various strategies)selected = self._select_best_agent(candidates, task)# Invoke selected agentreturn await self.invoke_agent(selected, task)def _find_suitable_agents(self, task_type: str) -> List[str]:"""Find agents suitable for task type."""type_mapping = {"analysis": ["data_analyst", "researcher"],"writing": ["writer", "editor", "reporter"],"browsing": ["browser_agent", "scraper"]}agent_names = type_mapping.get(task_type, [])return [name for name in agent_namesif AgentRegistry.get(name) is not None]def _select_best_agent(self, candidates: List[str], task: str) -> str:"""Select best agent from candidates."""# Could implement various strategies:# - Round-robin# - Load balancing# - Capability matching# - Performance historyreturn candidates[0] # Simple: first available
Advanced Registry
Load Balancing
class LoadBalancedRegistry:"""Registry with load balancing capabilities."""_invocation_counts: Dict[str, int] = {}_active_tasks: Dict[str, int] = {}@classmethoddef get_least_loaded(cls, agent_type: str = None) -> Optional[str]:"""Get least loaded agent."""candidates = list(AgentRegistry.all().keys())if agent_type:# Filter by typecandidates = [name for name in candidatesif name.startswith(agent_type)]if not candidates:return None# Find least loadedreturn min(candidates,key=lambda n: cls._active_tasks.get(n, 0))@classmethoddef start_task(cls, agent_name: str):"""Mark task start."""cls._active_tasks[agent_name] = cls._active_tasks.get(agent_name, 0) + 1cls._invocation_counts[agent_name] = cls._invocation_counts.get(agent_name, 0) + 1@classmethoddef end_task(cls, agent_name: str):"""Mark task end."""if agent_name in cls._active_tasks:cls._active_tasks[agent_name] = max(0, cls._active_tasks[agent_name] - 1)@classmethoddef get_statistics(cls) -> Dict[str, Any]:"""Get load balancing statistics."""return {"total_invocations": sum(cls._invocation_counts.values()),"active_tasks": dict(cls._active_tasks),"invocation_counts": dict(cls._invocation_counts)}
Health Monitoring
class HealthMonitor:"""Monitor agent health and availability."""_health_status: Dict[str, Dict[str, Any]] = {}_last_check: Dict[str, datetime] = {}@classmethodasync def check_agent_health(cls, agent_name: str) -> bool:"""Check if agent is healthy."""agent = AgentRegistry.get(agent_name)if not agent:return Falsetry:# Simple ping teststart = time.time()response = await agent.run("ping", timeout=5.0)latency = time.time() - startcls._health_status[agent_name] = {"healthy": True,"latency": latency,"last_check": datetime.now()}return Trueexcept Exception as e:cls._health_status[agent_name] = {"healthy": False,"error": str(e),"last_check": datetime.now()}return False@classmethodasync def check_all_agents(cls) -> Dict[str, bool]:"""Health check all registered agents."""results = {}for agent_name in list(AgentRegistry.all().keys()):results[agent_name] = await cls.check_agent_health(agent_name)return results@classmethoddef get_healthy_agents(cls) -> List[str]:"""Get list of healthy agents."""return [name for name, status in cls._health_status.items()if status.get("healthy", False)]
Registry Persistence
class PersistentRegistry:"""Registry with state persistence."""@classmethoddef save_state(cls, filepath: str):"""Save registry state to file."""state = {"timestamp": datetime.now().isoformat(),"agents": {}}for agent_name in list(AgentRegistry.all().keys()):agent = AgentRegistry.get(agent_name)if agent:state["agents"][agent_name] = {"type": type(agent).__name__,"description": getattr(agent, 'description', ''),"model": getattr(agent.model, 'name', 'unknown')}with open(filepath, 'w') as f:json.dump(state, f, indent=2)@classmethoddef load_state(cls, filepath: str) -> Dict:"""Load registry state from file."""with open(filepath, 'r') as f:return json.load(f)@classmethodasync def restore_agents(cls, state: Dict, model_configs: Dict[str, ModelConfig]):"""Restore agents from saved state."""for agent_name, info in state.get("agents", {}).items():agent_type = info["type"]model_name = info.get("model", "anthropic/claude-opus-4.6")# Get appropriate configconfig = model_configs.get(model_name)if not config:continue# Recreate agent based on typeif agent_type == "Agent":Agent(name=agent_name,model_config=config,goal="Restored agent goal",instruction="Restored agent instruction.")elif agent_type == "BrowserAgent":await BrowserAgent.create_safe(name=agent_name,model_config=config,headless=True)# Add other agent types as needed
Best Practices
1. Unique Naming
# GOOD - Descriptive, unique namesagent1 = Agent(name="financial_analyst_v2",model_config=config,goal="Analyze financial performance",instruction="Review financial data and report key findings.")agent2 = Agent(name="report_generator_q4_2024",model_config=config,goal="Generate Q4 reports",instruction="Create clear and complete quarterly reports.")# BAD - Generic, collision-prone namesagent1 = Agent(name="agent",model_config=config,goal="Generic goal",instruction="Generic instruction.")agent2 = Agent(name="helper",model_config=config,goal="Generic goal",instruction="Generic instruction.")
2. Existence Checks
# GOOD - Always check before invokingasync def safe_delegate(agent_name: str, task: str):if AgentRegistry.get(agent_name) is None:logger.warning(f"Agent {agent_name} not found")# Fallback logicreturn await use_fallback_agent(task)agent = AgentRegistry.get(agent_name)return await agent.run(task)# BAD - Assuming agent existsasync def unsafe_delegate(agent_name: str, task: str):agent = AgentRegistry.get(agent_name)return await agent.run(task) # Will fail if agent is None
3. Resource Management
# GOOD - Use auto-cleanup in Orchestraresult = await Orchestra.run(task="Process batch",topology=topology,execution_config=ExecutionConfig(auto_cleanup_agents=True # Default - cleans up after run))# All topology agents automatically cleaned up and unregistered# GOOD - Manual cleanup for standalone agentsasync def process_batch(items):temp_agent = Agent(name=f"batch_processor_${uuid.uuid4().hex[:8]}",model_config=config,goal="Process batch items",instruction="Process each item and return concise structured output.")results = []for item in items:result = await temp_agent.run(f"Process: {item}")results.append(result)# Manual cleanupawait temp_agent.cleanup()AgentRegistry.unregister_instance(temp_agent)return results# BAD - Creating agents without cleanupfor i in range(1000):Agent(name=f"worker_{i}",model_config=config,goal="Temporary worker",instruction="Handle temporary workload.")# Creates 1000 agents with open aiohttp sessions, registry entries!
4. Monitoring
# GOOD - Monitor registry healthasync def monitor_registry():"""Regular registry monitoring."""while True:agents = list(AgentRegistry.all().keys())logger.info(f"Active agents: {len(agents)}")if len(agents) > 100:logger.warning("High agent count - possible leak")# Health checksfor agent_name in agents[:10]: # Sample checkhealthy = await HealthMonitor.check_agent_health(agent_name)if not healthy:logger.error(f"Agent {agent_name} unhealthy")await asyncio.sleep(60) # Check every minute
Next Steps
Agents
Learn about agent creation
Communication
How agents communicate
Topology
Agent organization patterns
Registry System Ready!
You now understand the AgentRegistry in MARSYS. The registry provides robust service discovery and lifecycle management for dynamic multi-agent systems.