State Management
State persistence and checkpointing for long-running workflows - pause, resume, and recover from failures.
Overview
The state management system enables:
- Pause and Resume: Stop execution and continue later
- Checkpointing: Save snapshots at critical points
- Session Recovery: Recover from failures
- State Persistence: Long-term workflow storage
Core Components
StateManager
The main interface for state management:
from marsys.coordination.state import StateManager, FileStorageBackendfrom pathlib import Path# Initialize with storage backendstorage = FileStorageBackend(Path("./state"))state_manager = StateManager(storage_backend=storage)
StorageBackend
Abstract base class for storage implementations:
class StorageBackend(ABC):async def save(self, key: str, data: Dict[str, Any]) -> Noneasync def load(self, key: str) -> Optional[Dict[str, Any]]async def delete(self, key: str) -> Noneasync def list_keys(self, prefix: str = "") -> List[str]async def exists(self, key: str) -> bool
FileStorageBackend
Local file system storage (the default backend):
from marsys.coordination.state import FileStorageBackendfrom pathlib import Pathstorage = FileStorageBackend(base_path=Path("./state"))# Creates subdirectories for sessions and checkpoints# Stores data as JSON files
Basic Usage
Enable State Management
from marsys.coordination import Orchestrafrom marsys.coordination.state import StateManager, FileStorageBackend# Create state managerstate_manager = StateManager(storage_backend=FileStorageBackend("./state"))# Run with state managementresult = await Orchestra.run(task="Long-running research task",topology=topology,state_manager=state_manager,context={"session_id": "research_2024"})
StateManager Methods
# Save session stateawait state_manager.save_session(session_id, state)# Load session statestate = await state_manager.load_session(session_id)# Create checkpointcheckpoint_id = await state_manager.create_checkpoint(session_id,name="before_critical_section")# Restore from checkpointstate = await state_manager.restore_checkpoint(checkpoint_id)# List sessionssessions = await state_manager.list_sessions()# List checkpointscheckpoints = await state_manager.list_checkpoints(session_id)
Checkpointing
Manual Checkpoints
Create checkpoints at critical points for recovery:
# Create checkpoint before risky operationcheckpoint_id = await state_manager.create_checkpoint(session_id="session_123",name="before_data_processing")try:# Risky operationresult = await process_data()except Exception as e:# Restore from checkpointstate = await state_manager.restore_checkpoint(checkpoint_id)# Try alternative approachresult = await process_data_alternative()
Checkpoint Strategy
Create checkpoints before external API calls, database operations, or any operation that might fail or take a long time.
Example Workflow
from marsys.coordination import Orchestrafrom marsys.coordination.state import StateManager, FileStorageBackendfrom pathlib import Path# Setupstorage = FileStorageBackend(Path("./workflow_state"))state_manager = StateManager(storage_backend=storage)# Start workflowresult = await Orchestra.run(task="Analyze large dataset",topology=topology,state_manager=state_manager,context={"session_id": "analysis_123"})# Create checkpoint after first phasecheckpoint_id = await state_manager.create_checkpoint("analysis_123",name="after_initial_analysis")# Continue with more processing# If something fails, restore from checkpoint
Best Practices
1. Use Meaningful Session IDs
# Good - descriptive and uniquecontext = {"session_id": "market_analysis_2024_q1"}# Bad - genericcontext = {"session_id": "session1"}
2. Checkpoint Before Critical Operations
checkpoint_id = await state_manager.create_checkpoint(session_id,name="before_external_api_call")
3. Clean Up Old Sessions
# List and delete old sessionssessions = await state_manager.list_sessions()for session in sessions:if session.timestamp < cutoff_time:await state_manager.delete_session(session.session_id)
Limitations
Current Limitations
The current implementation only supports FileStorageBackend. Redis and database backends are planned for future releases. Automatic checkpointing is not yet implemented - checkpoints must be created manually.