State Management API
Complete API reference for state persistence, checkpointing, and session management in multi-agent workflows.
See Also
For state management patterns, see the State Management Concepts guide.
StateManager
Main interface for state persistence and recovery.
Import
from marsys.coordination.state import StateManager, FileStorageBackend
Constructor
StateManager(storage_backend: StorageBackend,enable_compression: bool = True,enable_checksum: bool = True,max_checkpoints_per_session: int = 10)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| storage_backend | StorageBackend | Required | Storage implementation |
| enable_compression | bool | True | Compress state data |
| enable_checksum | bool | True | Validate state integrity |
| max_checkpoints_per_session | int | 10 | Max checkpoints per session |
Key Methods
save_state
async def save_state(session_id: str,snapshot: StateSnapshot) -> None
Save execution state for a session.
load_state
async def load_state(session_id: str) -> Optional[StateSnapshot]
Load execution state for a session.
pause_execution
async def pause_execution(session_id: str,state: Dict[str, Any]) -> None
Pause execution and save current state.
resume_execution
async def resume_execution(session_id: str) -> Optional[Dict[str, Any]]
Resume execution from saved state.
create_checkpoint
async def create_checkpoint(session_id: str,checkpoint_name: Optional[str] = None,metadata: Optional[Dict[str, Any]] = None) -> str
Create a checkpoint of current state. Returns checkpoint ID.
restore_checkpoint
async def restore_checkpoint(checkpoint_id: str) -> Optional[StateSnapshot]
Restore state from checkpoint.
Example
from pathlib import Path# Initialize with file storagestorage = FileStorageBackend(Path("./state"))state_manager = StateManager(storage)# Save statesnapshot = StateSnapshot(session_id="session_123",timestamp=time.time(),branches=serialized_branches,active_branches={"branch_1", "branch_2"},completed_branches={"branch_0"})await state_manager.save_state("session_123", snapshot)# Create checkpointcheckpoint_id = await state_manager.create_checkpoint("session_123",checkpoint_name="before_critical_section")# Resume laterstate = await state_manager.resume_execution("session_123")
StateSnapshot
Snapshot of execution state at a point in time.
Attributes
| Attribute | Type | Description |
|---|---|---|
| session_id | str | Session identifier |
| timestamp | float | Snapshot timestamp |
| branches | Dict[str, Dict] | Serialized branch data |
| active_branches | Set[str] | Currently active branch IDs |
| completed_branches | Set[str] | Completed branch IDs |
| branch_results | Dict[str, Dict] | Branch results |
| checksum | str | Integrity checksum |
Methods
# Calculate integrity checksumsnapshot.checksum = snapshot.calculate_checksum()# Validate laterif not snapshot.validate_checksum():raise StateError("State corruption detected")
FileStorageBackend
File-based storage implementation.
Constructor
FileStorageBackend(base_path: Path,compression: Optional[str] = None,encryption_key: Optional[str] = None)
Directory Structure
base_path/├── sessions/ # Active session states├── checkpoints/ # Named checkpoints└── metadata/ # Session metadata
Example
storage = FileStorageBackend(base_path=Path("./state"),compression="gzip")# Save dataawait storage.save("session_123", state_data)# Load datadata = await storage.load("session_123")# List sessionssessions = await storage.list_keys(prefix="session_")# Check existenceif await storage.exists("session_123"):print("Session exists")
Custom Storage Backend
class RedisStorageBackend(StorageBackend):"""Redis-based storage backend."""def __init__(self, redis_client):self.client = redis_clientasync def save(self, key: str, data: Dict[str, Any]) -> None:serialized = json.dumps(data)await self.client.set(key, serialized)async def load(self, key: str) -> Optional[Dict[str, Any]]:data = await self.client.get(key)return json.loads(data) if data else Noneasync def delete(self, key: str) -> None:await self.client.delete(key)async def list_keys(self, prefix: str = "") -> List[str]:pattern = f"{prefix}*" if prefix else "*"return await self.client.keys(pattern)async def exists(self, key: str) -> bool:return await self.client.exists(key)
Checkpoint Management
Creating Checkpoints
# Manual checkpointcheckpoint_id = await state_manager.create_checkpoint(session_id="session_123",checkpoint_name="milestone_1",metadata={"progress": 0.5,"stage": "data_processing","timestamp": datetime.now().isoformat()})print(f"Checkpoint created: {checkpoint_id}")
Automatic Checkpointing
class AutoCheckpointManager:"""Automatic checkpoint creation."""def __init__(self, state_manager, interval_seconds=300):self.state_manager = state_managerself.interval = interval_secondsself.last_checkpoint = time.time()async def maybe_checkpoint(self, session_id: str, state: StateSnapshot):"""Create checkpoint if interval elapsed."""if time.time() - self.last_checkpoint > self.interval:await self.state_manager.create_checkpoint(session_id,checkpoint_name=f"auto_{int(time.time())}")self.last_checkpoint = time.time()
Restoring from Checkpoint
# List available checkpointscheckpoints = await state_manager.list_checkpoints("session_123")for cp in checkpoints:print(f"{cp['id']}: {cp['name']} - {cp['created_at']}")# Restore specific checkpointstate = await state_manager.restore_checkpoint("checkpoint_abc123")if state:print(f"Restored from checkpoint: {state.session_id}")
Session Lifecycle
# 1. Start session with state managementstate_manager = StateManager(storage)result = await Orchestra.run(task="Long running task",topology=topology,state_manager=state_manager,context={"session_id": "session_123"})# 2. Session can be paused (manually or on error)await state_manager.pause_execution("session_123", current_state)# 3. Resume laterstate = await state_manager.resume_execution("session_123")if state:result = await Orchestra.resume(state=state,topology=topology)# 4. Clean up completed sessionawait state_manager.delete_session("session_123")
Best Practices
Do
- Create checkpoints before critical operations
- Validate checksums after loading state
- Clean up old sessions periodically
- Use compression for large states
- Include metadata in checkpoints
Don't
- Store sensitive data unencrypted
- Ignore storage failures
- Keep unlimited checkpoints
- Serialize non-serializable objects
- Modify state after checksum calculation
Pro Tip
Use automatic checkpointing with time or step intervals for long-running workflows. This provides recovery points without manual intervention.
Storage Limits
File storage backend creates one file per session/checkpoint. Monitor disk usage and implement cleanup for production deployments.