State Management

State persistence and checkpointing for long-running workflows - pause, resume, and recover from failures.

Overview

The state management system enables:

  • Pause and Resume: Stop execution and continue later
  • Checkpointing: Save snapshots at critical points
  • Session Recovery: Recover from failures
  • State Persistence: Long-term workflow storage

Core Components

StateManager

The main interface for state management:

from marsys.coordination.state import StateManager, FileStorageBackend
from pathlib import Path
# Initialize with storage backend
storage = FileStorageBackend(Path("./state"))
state_manager = StateManager(storage_backend=storage)

StorageBackend

Abstract base class for storage implementations:

class StorageBackend(ABC):
async def save(self, key: str, data: Dict[str, Any]) -> None
async def load(self, key: str) -> Optional[Dict[str, Any]]
async def delete(self, key: str) -> None
async def list_keys(self, prefix: str = "") -> List[str]
async def exists(self, key: str) -> bool

FileStorageBackend

Local file system storage (the default backend):

from marsys.coordination.state import FileStorageBackend
from pathlib import Path
storage = FileStorageBackend(base_path=Path("./state"))
# Creates subdirectories for sessions and checkpoints
# Stores data as JSON files

Basic Usage

Enable State Management

from marsys.coordination import Orchestra
from marsys.coordination.state import StateManager, FileStorageBackend
# Create state manager
state_manager = StateManager(
storage_backend=FileStorageBackend("./state")
)
# Run with state management
result = await Orchestra.run(
task="Long-running research task",
topology=topology,
state_manager=state_manager,
context={"session_id": "research_2024"}
)

StateManager Methods

# Save session state
await state_manager.save_session(session_id, state)
# Load session state
state = await state_manager.load_session(session_id)
# Create checkpoint
checkpoint_id = await state_manager.create_checkpoint(
session_id,
name="before_critical_section"
)
# Restore from checkpoint
state = await state_manager.restore_checkpoint(checkpoint_id)
# List sessions
sessions = await state_manager.list_sessions()
# List checkpoints
checkpoints = await state_manager.list_checkpoints(session_id)

Checkpointing

Manual Checkpoints

Create checkpoints at critical points for recovery:

# Create checkpoint before risky operation
checkpoint_id = await state_manager.create_checkpoint(
session_id="session_123",
name="before_data_processing"
)
try:
# Risky operation
result = await process_data()
except Exception as e:
# Restore from checkpoint
state = await state_manager.restore_checkpoint(checkpoint_id)
# Try alternative approach
result = await process_data_alternative()

Checkpoint Strategy

Create checkpoints before external API calls, database operations, or any operation that might fail or take a long time.

Example Workflow

from marsys.coordination import Orchestra
from marsys.coordination.state import StateManager, FileStorageBackend
from pathlib import Path
# Setup
storage = FileStorageBackend(Path("./workflow_state"))
state_manager = StateManager(storage_backend=storage)
# Start workflow
result = await Orchestra.run(
task="Analyze large dataset",
topology=topology,
state_manager=state_manager,
context={"session_id": "analysis_123"}
)
# Create checkpoint after first phase
checkpoint_id = await state_manager.create_checkpoint(
"analysis_123",
name="after_initial_analysis"
)
# Continue with more processing
# If something fails, restore from checkpoint

Best Practices

1. Use Meaningful Session IDs

# Good - descriptive and unique
context = {"session_id": "market_analysis_2024_q1"}
# Bad - generic
context = {"session_id": "session1"}

2. Checkpoint Before Critical Operations

checkpoint_id = await state_manager.create_checkpoint(
session_id,
name="before_external_api_call"
)

3. Clean Up Old Sessions

# List and delete old sessions
sessions = await state_manager.list_sessions()
for session in sessions:
if session.timestamp < cutoff_time:
await state_manager.delete_session(session.session_id)

Limitations

Current Limitations

The current implementation only supports FileStorageBackend. Redis and database backends are planned for future releases. Automatic checkpointing is not yet implemented - checkpoints must be created manually.