State Management API

Complete API reference for state persistence, checkpointing, and session management in multi-agent workflows.

See Also

For state management patterns, see the State Management Concepts guide.

StateManager

Main interface for state persistence and recovery.

Import

from marsys.coordination.state import StateManager, FileStorageBackend

Constructor

StateManager(
storage_backend: StorageBackend,
enable_compression: bool = True,
enable_checksum: bool = True,
max_checkpoints_per_session: int = 10
)

Parameters

ParameterTypeDefaultDescription
storage_backendStorageBackendRequiredStorage implementation
enable_compressionboolTrueCompress state data
enable_checksumboolTrueValidate state integrity
max_checkpoints_per_sessionint10Max checkpoints per session

Key Methods

save_state

async def save_state(
session_id: str,
snapshot: StateSnapshot
) -> None

Save execution state for a session.

load_state

async def load_state(
session_id: str
) -> Optional[StateSnapshot]

Load execution state for a session.

pause_execution

async def pause_execution(
session_id: str,
state: Dict[str, Any]
) -> None

Pause execution and save current state.

resume_execution

async def resume_execution(
session_id: str
) -> Optional[Dict[str, Any]]

Resume execution from saved state.

create_checkpoint

async def create_checkpoint(
session_id: str,
checkpoint_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str

Create a checkpoint of current state. Returns checkpoint ID.

restore_checkpoint

async def restore_checkpoint(
checkpoint_id: str
) -> Optional[StateSnapshot]

Restore state from checkpoint.

Example

from pathlib import Path
# Initialize with file storage
storage = FileStorageBackend(Path("./state"))
state_manager = StateManager(storage)
# Save state
snapshot = StateSnapshot(
session_id="session_123",
timestamp=time.time(),
branches=serialized_branches,
active_branches={"branch_1", "branch_2"},
completed_branches={"branch_0"}
)
await state_manager.save_state("session_123", snapshot)
# Create checkpoint
checkpoint_id = await state_manager.create_checkpoint(
"session_123",
checkpoint_name="before_critical_section"
)
# Resume later
state = await state_manager.resume_execution("session_123")

StateSnapshot

Snapshot of execution state at a point in time.

Attributes

AttributeTypeDescription
session_idstrSession identifier
timestampfloatSnapshot timestamp
branchesDict[str, Dict]Serialized branch data
active_branchesSet[str]Currently active branch IDs
completed_branchesSet[str]Completed branch IDs
branch_resultsDict[str, Dict]Branch results
checksumstrIntegrity checksum

Methods

# Calculate integrity checksum
snapshot.checksum = snapshot.calculate_checksum()
# Validate later
if not snapshot.validate_checksum():
raise StateError("State corruption detected")

FileStorageBackend

File-based storage implementation.

Constructor

FileStorageBackend(
base_path: Path,
compression: Optional[str] = None,
encryption_key: Optional[str] = None
)

Directory Structure

base_path/
├── sessions/ # Active session states
├── checkpoints/ # Named checkpoints
└── metadata/ # Session metadata

Example

storage = FileStorageBackend(
base_path=Path("./state"),
compression="gzip"
)
# Save data
await storage.save("session_123", state_data)
# Load data
data = await storage.load("session_123")
# List sessions
sessions = await storage.list_keys(prefix="session_")
# Check existence
if await storage.exists("session_123"):
print("Session exists")

Custom Storage Backend

class RedisStorageBackend(StorageBackend):
"""Redis-based storage backend."""
def __init__(self, redis_client):
self.client = redis_client
async def save(self, key: str, data: Dict[str, Any]) -> None:
serialized = json.dumps(data)
await self.client.set(key, serialized)
async def load(self, key: str) -> Optional[Dict[str, Any]]:
data = await self.client.get(key)
return json.loads(data) if data else None
async def delete(self, key: str) -> None:
await self.client.delete(key)
async def list_keys(self, prefix: str = "") -> List[str]:
pattern = f"{prefix}*" if prefix else "*"
return await self.client.keys(pattern)
async def exists(self, key: str) -> bool:
return await self.client.exists(key)

Checkpoint Management

Creating Checkpoints

# Manual checkpoint
checkpoint_id = await state_manager.create_checkpoint(
session_id="session_123",
checkpoint_name="milestone_1",
metadata={
"progress": 0.5,
"stage": "data_processing",
"timestamp": datetime.now().isoformat()
}
)
print(f"Checkpoint created: {checkpoint_id}")

Automatic Checkpointing

class AutoCheckpointManager:
"""Automatic checkpoint creation."""
def __init__(self, state_manager, interval_seconds=300):
self.state_manager = state_manager
self.interval = interval_seconds
self.last_checkpoint = time.time()
async def maybe_checkpoint(self, session_id: str, state: StateSnapshot):
"""Create checkpoint if interval elapsed."""
if time.time() - self.last_checkpoint > self.interval:
await self.state_manager.create_checkpoint(
session_id,
checkpoint_name=f"auto_{int(time.time())}"
)
self.last_checkpoint = time.time()

Restoring from Checkpoint

# List available checkpoints
checkpoints = await state_manager.list_checkpoints("session_123")
for cp in checkpoints:
print(f"{cp['id']}: {cp['name']} - {cp['created_at']}")
# Restore specific checkpoint
state = await state_manager.restore_checkpoint("checkpoint_abc123")
if state:
print(f"Restored from checkpoint: {state.session_id}")

Session Lifecycle

# 1. Start session with state management
state_manager = StateManager(storage)
result = await Orchestra.run(
task="Long running task",
topology=topology,
state_manager=state_manager,
context={"session_id": "session_123"}
)
# 2. Session can be paused (manually or on error)
await state_manager.pause_execution("session_123", current_state)
# 3. Resume later
state = await state_manager.resume_execution("session_123")
if state:
result = await Orchestra.resume(
state=state,
topology=topology
)
# 4. Clean up completed session
await state_manager.delete_session("session_123")

Best Practices

Do

  • Create checkpoints before critical operations
  • Validate checksums after loading state
  • Clean up old sessions periodically
  • Use compression for large states
  • Include metadata in checkpoints

Don't

  • Store sensitive data unencrypted
  • Ignore storage failures
  • Keep unlimited checkpoints
  • Serialize non-serializable objects
  • Modify state after checksum calculation

Pro Tip

Use automatic checkpointing with time or step intervals for long-running workflows. This provides recovery points without manual intervention.

Storage Limits

File storage backend creates one file per session/checkpoint. Monitor disk usage and implement cleanup for production deployments.