Agents
Agents are the fundamental building blocks of MARSYS - autonomous AI entities that can perceive, reason, and take actions to accomplish tasks.
See Also
For detailed class signatures, method parameters, and API reference, see the Agent API Reference.
Overview
Agents in MARSYS are designed with a pure execution model - they implement stateless _run() methods with no side effects, enabling:
- True Parallel Execution: Multiple instances run concurrently without conflicts
- Branch Isolation: Each execution branch maintains independent state
- State Persistence: Clean serialization for pause/resume capabilities
- Testability: Pure functions are easier to test and debug
Architecture
The agent system consists of several interconnected components:
- Agent System: BaseAgent (Abstract Interface), Agent (Standard Implementation), BrowserAgent (Web Automation), LearnableAgent (Fine-tunable), AgentPool (Parallel Instances)
- Core Components: Memory (ConversationMemory), Tools (Function Registry), Model (LLM/VLM/API), AgentRegistry (Global Registry)
Agent Classes
BaseAgent (Abstract)
The foundation interface all agents must implement:
from abc import ABC, abstractmethodfrom marsys.agents.memory import Messageclass BaseAgent(ABC):def __init__(self,model: Union[BaseLocalModel, BaseAPIModel],name: str,goal: str,instruction: str,tools: Optional[Dict[str, Callable]] = None,max_tokens: Optional[int] = 10000,allowed_peers: Optional[List[str]] = None,bidirectional_peers: bool = False,is_convergence_point: Optional[bool] = None,input_schema: Optional[Any] = None,output_schema: Optional[Any] = None,memory_retention: str = "session",memory_storage_path: Optional[str] = None):# Auto-generates tool schemas from function signatures# Registers with AgentRegistry# Initializes memory managerpass@abstractmethodasync def _run(self,prompt: Any,context: Dict[str, Any],**kwargs) -> Message:"""Pure execution logic - NO side effects"""pass
Pure Execution Rule
The _run() method must be pure - no memory manipulation, no logging to files, no global state changes. All side effects are handled by the coordination layer.
Agent (Standard Implementation)
The standard agent with built-in capabilities:
from marsys.agents import Agentfrom marsys.models import ModelConfigagent = Agent(model_config=ModelConfig(type="api",name="anthropic/claude-haiku-4.5",provider="openrouter",api_key=os.getenv("OPENROUTER_API_KEY"),parameters={"temperature": 0.7},max_tokens=12000),name="DataAnalyst",goal="Analyze data and extract meaningful trends and insights",instruction="You are a thorough data analyst...",tools={"analyze_data": analyze_data, "create_chart": create_chart},memory_retention="session")
BrowserAgent
Specialized for web automation tasks:
from marsys.agents import BrowserAgentbrowser = await BrowserAgent.create_safe(model_config=config,name="WebResearcher",goal="Research and extract information from web pages",instruction="Web research specialist with browser access and automation capabilities",headless=True,viewport_width=1920,viewport_height=1080)# Browser-specific capabilities (via browser_tool)await browser.browser_tool.goto("https://example.com")await browser.browser_tool.mouse_click(x=500, y=300) # Click by coordinatescontent = await browser.browser_tool.fetch_url("https://example.com")
LearnableAgent
Supports fine-tuning with PEFT methods (HuggingFace backend required):
from marsys.agents import LearnableAgentfrom marsys.models import ModelConfig# Configure local model (HuggingFace backend required for training)model_config = ModelConfig(type="local",model_class="llm",name="Qwen/Qwen3-4B-Instruct-2507",backend="huggingface", # Required for trainingtorch_dtype="bfloat16",device_map="auto",max_tokens=4096)learnable = LearnableAgent(model_config=model_config,name="AdaptiveAssistant",goal="Learn from interactions and adapt to user preferences",instruction="Adaptive assistant that improves through fine-tuning",learning_head="peft", # PEFT (LoRA) fine-tuninglearning_head_config={"r": 8, # LoRA rank"lora_alpha": 32,"target_modules": ["q_proj", "v_proj"],"lora_dropout": 0.1})# Access model internals for trainingpytorch_model = learnable.model.trainable_model # PEFT-wrapped modeltokenizer = learnable.model.tokenizer # HuggingFace tokenizer
AgentPool
For true parallel execution with isolated instances:
from marsys.agents.agent_pool import AgentPool# Create pool of 3 browser agentspool = AgentPool(agent_class=BrowserAgent,num_instances=3,model_config=config,name="BrowserPool",goal="Perform parallel web automation tasks",instruction="Browser automation agents for concurrent web operations",headless=True)# Acquire instance for branch (automatic queue management)async with pool.acquire(branch_id="branch_123") as agent:result = await agent.run(task)# Pool handles:# - Instance creation with unique names (BrowserPool_0, BrowserPool_1, etc.)# - Fair allocation with queue-based waiting# - Automatic release after use# - Statistics tracking# - Cleanup on destruction
When to Use AgentPool
Use AgentPool when you need:
- True parallel execution of the same agent type
- Resource isolation (browser instances, API connections)
- Fair allocation across multiple branches
- Prevent state conflicts between parallel executions
Creating Agents
Basic Agent
from marsys.agents import Agentfrom marsys.models import ModelConfig# Simple agentassistant = Agent(model_config=ModelConfig(type="api",name="anthropic/claude-haiku-4.5",provider="openrouter",max_tokens=12000),name="Assistant",goal="Provide helpful assistance to users",instruction="A helpful AI assistant that responds thoughtfully to queries")# Run the agentresponse = await assistant.run(prompt="Explain quantum computing",context={"user_id": "123"})
Agent with Tools
def search_web(query: str, max_results: int = 5) -> List[Dict]:"""Search the web for information.Args:query: Search query stringmax_results: Maximum number of resultsReturns:List of search results with title, url, snippet"""# Implementation herereturn resultsdef analyze_data(data: List[float], method: str = "mean") -> float:"""Analyze numerical data.Args:data: List of numerical valuesmethod: Analysis method (mean, median, std)Returns:Analysis result"""# Implementation herereturn resultresearcher = Agent(model_config=config,name="Researcher",goal="Research specialist with web search and analysis capabilities",instruction="You are a research specialist. Use search_web to find information and analyze_data to process results.",tools={"search_web": search_web, "analyze_data": analyze_data} # Auto-generates OpenAI-compatible schemas)
Automatic Schema Generation
Tool schemas are automatically generated from function signatures and docstrings. Ensure your functions have proper type hints and docstrings for accurate schema generation.
Custom Agent Class
from marsys.agents import BaseAgentfrom marsys.agents.memory import Messageclass CustomAnalyzer(BaseAgent):def __init__(self, model, **kwargs):super().__init__(model, **kwargs)# NO instance variables that change during executionself.analysis_patterns = self._load_patterns() # OK - static configasync def _run(self, prompt, context, **kwargs):"""Pure execution - no side effects"""# Prepare messagesmessages = self._prepare_messages(prompt)# Add analysis contextif context.get("analysis_type") == "detailed":messages.append({"role": "system","content": "Provide detailed analysis with examples"})# Call model (pure)response = await self.model.run(messages)# Return pure Messagereturn Message(role="assistant",content=response.content,structured_data={"analyzed": True})
Response Formats
Agents communicate using standardized response formats:
Sequential Agent Invocation
{"thought": "I need help from the DataAnalyzer","next_action": "invoke_agent","action_input": "DataAnalyzer"}
Parallel Agent Invocation
{"thought": "These analyses can run in parallel","next_action": "parallel_invoke","agents": ["Analyzer1", "Analyzer2", "Analyzer3"],"agent_requests": {"Analyzer1": "Analyze sales data for Q1","Analyzer2": "Analyze sales data for Q2","Analyzer3": "Analyze sales data for Q3"}}
Tool Call
{"next_action": "call_tool","tool_calls": [{"id": "call_abc123","type": "function","function": {"name": "search_web","arguments": "{\"query\": \"latest AI research\", \"max_results\": 10}"}}]}
Final Response
{"next_action": "final_response","content": "Here is my analysis..."}# Or structured response{"next_action": "final_response","content": {"summary": "Key findings","details": [...],"recommendations": [...]}}
Memory Management
Each agent maintains its conversation history through a ConversationMemory:
# Memory retention policiesagent = Agent(model_config=config,name="Assistant",goal="Provide helpful assistance",instruction="A helpful AI assistant",memory_retention="session" # Options: single_run, session, persistent)# Memory is automatically managed during execution# - single_run: Cleared after each run# - session: Maintained for workflow duration# - persistent: Saved to disk for long-term retention
For more details, see Memory Documentation.
Agent Configuration
Model Configuration
from marsys.models import ModelConfig# API Model (OpenAI, Anthropic, Google)api_config = ModelConfig(type="api",provider="openrouter",name="anthropic/claude-haiku-4.5",api_key=os.getenv("OPENROUTER_API_KEY"),parameters={"temperature": 0.7},max_tokens=12000)# Local LLMlocal_config = ModelConfig(type="local",name="llama-2-7b",model_path="/models/llama-2-7b",device="cuda",parameters={"temperature": 0.8,"max_length": 1024})# Vision-Language Modelvlm_config = ModelConfig(type="api",provider="openrouter",name="google/gemini-2.5-pro",max_tokens=12000)
Agent Parameters
agent = Agent(model_config=config,name="Expert", # Unique identifiergoal="Domain expert in...", # Role definitioninstruction="Detailed instructions...", # System messagetools={"tool_name": tool_func}, # Available functions (dict format)max_tokens=2000, # Response limitallowed_peers=["Agent1", "Agent2"], # Can invoke these agentsmemory_retention="session", # Memory policyinput_schema=InputSchema, # Validate inputsoutput_schema=OutputSchema # Validate outputs)
Best Practices
1. Keep _run() Pure
# ✅ GOOD - Pure functionasync def _run(self, prompt, context, **kwargs):messages = self._prepare_messages(prompt)response = await self.model.run(messages)return Message(role="assistant", content=response.content)# ❌ BAD - Side effectsasync def _run(self, prompt, context, **kwargs):self.memory.add(...) # NO! Memory handled externallyawait self.save_to_database(...) # NO! Side effectsself.state_counter += 1 # NO! Mutable statereturn response
2. Clear Agent Descriptions
# ✅ GOOD - Specific and actionablegoal="""You are a Python code reviewer specializing in security and performance.Your responsibilities:1. Identify security vulnerabilities (SQL injection, XSS, etc.)2. Suggest performance optimizations3. Ensure PEP 8 compliance4. Provide actionable feedback with code examplesOutput format: Markdown with code blocks"""# ❌ BAD - Vaguegoal="You review code"
3. Robust Tool Design
# ✅ GOOD - Type hints, docstring, error handlingdef fetch_data(source: str,filters: Optional[Dict[str, Any]] = None,limit: int = 100) -> List[Dict[str, Any]]:"""Fetch data from specified source.Args:source: Data source identifierfilters: Optional filtering criterialimit: Maximum records to returnReturns:List of data recordsRaises:ValueError: If source is invalidConnectionError: If source is unreachable"""try:# Implementationreturn dataexcept Exception as e:logger.error(f"Failed to fetch data: {e}")raise# ❌ BAD - No types, poor docs, no error handlingdef fetch_data(source, filters=None):"""Get data"""return get_from_db(source)
4. Use AgentPool for Parallelism
# ✅ GOOD - Pool for parallel executionpool = AgentPool(BrowserAgent, num_instances=3, ...)tasks = ["url1", "url2", "url3"]async def process_url(url, branch_id):async with pool.acquire(branch_id) as agent:return await agent.scrape(url)results = await asyncio.gather(*[process_url(url, f"branch_{i}")for i, url in enumerate(tasks)])# ❌ BAD - Reusing single instanceagent = BrowserAgent(...)# This will have conflicts with parallel executionresults = await asyncio.gather(*[agent.scrape(url) for url in tasks])
Common Patterns
Research Team Pattern
# Specialized agentsdata_collector = Agent(model_config=config,name="DataCollector",goal="Collect data from web sources",instruction="You collect data by searching the web and scraping pages.",tools={"search_web": search_web, "scrape_page": scrape_page})analyzer = Agent(model_config=config,name="Analyzer",goal="Analyze data statistically",instruction="You analyze data using statistical methods and create charts.",tools={"statistical_analysis": statistical_analysis, "create_charts": create_charts})writer = Agent(model_config=config,name="Writer",goal="Technical writer creating reports",instruction="You write clear technical reports based on analyzed data.")# Coordinator orchestrates themcoordinator = Agent(model_config=config,name="Coordinator",goal="Coordinate research projects between specialized agents",instruction="""You coordinate research projects. Your workflow:1. Ask DataCollector to gather information2. Ask Analyzer to process the data3. Ask Writer to create the report""",allowed_peers=["DataCollector", "Analyzer", "Writer"])
Error Recovery Pattern
class ResilientAgent(BaseAgent):async def _run(self, prompt, context, **kwargs):try:# Primary logicresponse = await self.primary_approach(prompt)return Message(role="assistant", content=response)except SpecificError as e:# Fallback approachresponse = await self.fallback_approach(prompt)return Message(role="assistant",content=response,structured_data={"fallback_used": True})
Validation Pattern
from pydantic import BaseModel, Fieldclass AnalysisInput(BaseModel):data: List[float] = Field(..., min_items=1)method: str = Field(..., pattern="^(mean|median|std)$")class AnalysisOutput(BaseModel):result: floatconfidence: float = Field(..., ge=0, le=1)metadata: Dict[str, Any]validator_agent = Agent(model_config=config,name="ValidatedAnalyzer",goal="Analyze data with validation",instruction="Analyze input data and return validated results",input_schema=AnalysisInput,output_schema=AnalysisOutput)
Next Steps
- Memory System - Learn how agents maintain context and conversation history
- Tool Integration - Extend agent capabilities with custom functions
- Messages - Understand the message format and types
- Agent API Reference - Complete API documentation for agent classes
Agent System Ready!
You now understand the agent architecture in MARSYS. Agents provide the intelligence, while the Orchestra provides the coordination.