Skip to main content

Persisting Agent State Across Sessions: Step-by-Step Setup

Session persistence enables an agent to pause a task, user disconnects, and later resume as if no break occurred. Without persistence, long-running workflows (multi-day procurement approvals, week-long research projects) would restart from zero each time the user reconnects. State persistence includes the agent's progress (what step of a workflow we're on), working memory (current context), episodic records (what happened so far), and semantic facts (what we learned). This article covers practical patterns for saving and restoring full agent state.

What Agent State Includes and Why It Matters

Agent state comprises: (1) the task workflow state (step counter, completed subtasks), (2) working memory (current conversation and reasoning), (3) episodic records logged so far, (4) learned semantic facts, and (5) metadata (user, timestamp, error log). Persistence is critical for:

  • Cost reduction: Resuming a task costs far less than recomputing from scratch.
  • User experience: Users expect continuity; re-asking questions is frustrating.
  • Resilience: If the agent crashes mid-task, persistence enables recovery.
  • Auditing: Complete state history proves the agent followed proper protocols.

For example, a procurement agent approving a $50K invoice over 3 days: Day 1, manager approves budget; Day 2, compliance reviews; Day 3, CFO signs. Persistence tracks all three approvals. Without it, the agent restarts and re-asks the manager on day 2, burning time and creating frustration.

Session State Structure

Define a comprehensive session state object:

# Example: Session state structure
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Dict, List, Any, Optional

@dataclass
class AgentSessionState:
"""Complete capture of agent state for a session."""
session_id: str # Unique identifier
user_id: str
task_id: str # Which task/workflow
task_type: str # e.g., "procurement_approval", "customer_support"

# Workflow progress
current_step: int # Which step of the workflow
workflow_steps: List[str] # [ "manager_approval", "compliance_review", "cfo_sign" ]
completed_steps: List[str] # Filled as we progress
step_results: Dict[str, Any] # { step_name: {success: bool, result: ...} }

# Working memory snapshot
working_memory: List[Dict[str, str]] # Recent messages and reasoning
task_goal: str # Current task description
scratch_pad: Dict[str, Any] # Intermediate calculations

# Episodic records accumulated during session
session_events: List[Dict[str, Any]] # Events logged this session

# Semantic facts learned
learned_facts: Dict[str, Any] # Preferences, patterns discovered

# Metadata
created_at: datetime
last_updated: datetime
paused_at: Optional[datetime] = None
resume_count: int = 0 # How many times resumed
error_log: List[str] = None

def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
d = asdict(self)
d["created_at"] = self.created_at.isoformat()
d["last_updated"] = self.last_updated.isoformat()
if self.paused_at:
d["paused_at"] = self.paused_at.isoformat()
if not self.error_log:
self.error_log = []
return d

@staticmethod
def from_dict(d: dict):
"""Reconstruct from JSON dict."""
d["created_at"] = datetime.fromisoformat(d["created_at"])
d["last_updated"] = datetime.fromisoformat(d["last_updated"])
if d.get("paused_at"):
d["paused_at"] = datetime.fromisoformat(d["paused_at"])
return AgentSessionState(**d)

Saving State: Checkpointing Strategy

Save state at critical checkpoints: after each workflow step, after every N turns in a conversation, and on user disconnect. Save to persistent storage (database, cloud storage).

import json
import pickle
from typing import Optional

class SessionPersister:
def __init__(self, storage_backend):
"""
storage_backend: object with save(session_id, state_dict) and load(session_id) -> state_dict methods.
Can be database, file system, or cloud storage (S3, GCS).
"""
self.storage = storage_backend

def save_checkpoint(self, state: AgentSessionState, checkpoint_type: str = "auto"):
"""
Save a checkpoint of the current session state.
checkpoint_type: "auto" (routine save), "step_complete" (workflow step done), "error" (before crash).
"""
state.last_updated = datetime.now()

checkpoint_record = {
"session_id": state.session_id,
"timestamp": datetime.now().isoformat(),
"checkpoint_type": checkpoint_type,
"state": state.to_dict()
}

# Save to persistent storage
self.storage.save(state.session_id, checkpoint_record)

# Log checkpoint for auditing
print(f"[CHECKPOINT] Session {state.session_id} saved at step {state.current_step}")

def load_session(self, session_id: str) -> Optional[AgentSessionState]:
"""Load a session from persistent storage."""
checkpoint = self.storage.load(session_id)
if not checkpoint:
return None

state = AgentSessionState.from_dict(checkpoint["state"])
state.resume_count += 1
print(f"[RESUME] Session {session_id} resumed (resume #{state.resume_count})")

return state

def list_sessions(self, user_id: str) -> list:
"""List all sessions for a user (for resuming old tasks)."""
return self.storage.list_by_user(user_id)

Restoring State and Resuming Execution

When a user reconnects, load the state and resume from where we left off:

class AgentWithPersistence:
def __init__(self, model_client, persister: SessionPersister):
self.model = model_client
self.persister = persister
self.current_state: Optional[AgentSessionState] = None

def start_or_resume_task(self, user_id: str, task_id: str, resume_session_id: Optional[str] = None):
"""
Start a new task or resume an existing session.
"""
if resume_session_id:
# Resume existing session
self.current_state = self.persister.load_session(resume_session_id)
if not self.current_state:
raise ValueError(f"Session {resume_session_id} not found")

print(f"Resuming session: task '{self.current_state.task_id}' at step {self.current_state.current_step}")
else:
# Start new session
self.current_state = AgentSessionState(
session_id=self._generate_session_id(),
user_id=user_id,
task_id=task_id,
task_type="workflow",
current_step=0,
workflow_steps=[],
completed_steps=[],
step_results={},
working_memory=[],
task_goal="",
scratch_pad={},
session_events=[],
learned_facts={},
created_at=datetime.now(),
last_updated=datetime.now(),
error_log=[]
)
print(f"Starting new session {self.current_state.session_id}")

def execute_step(self) -> bool:
"""
Execute the current workflow step.
Save checkpoint after each step.
Return True if step succeeded, False if failed.
"""
if self.current_state.current_step >= len(self.current_state.workflow_steps):
print("Workflow complete")
return True

step_name = self.current_state.workflow_steps[self.current_state.current_step]
print(f"[EXECUTING] Step {self.current_state.current_step + 1}/{len(self.current_state.workflow_steps)}: {step_name}")

try:
# Execute the step (call agent, tool, etc.)
result = self._execute_step_internal(step_name)

# Record success
self.current_state.completed_steps.append(step_name)
self.current_state.step_results[step_name] = {"success": True, "result": result}
self.current_state.current_step += 1

# Save checkpoint after step completion
self.persister.save_checkpoint(self.current_state, checkpoint_type="step_complete")

return True

except Exception as e:
# Record failure
error_msg = str(e)
self.current_state.error_log.append({
"step": step_name,
"error": error_msg,
"timestamp": datetime.now().isoformat()
})
self.current_state.step_results[step_name] = {"success": False, "error": error_msg}

# Save checkpoint for debugging
self.persister.save_checkpoint(self.current_state, checkpoint_type="error")

print(f"[ERROR] Step {step_name} failed: {error_msg}")
return False

def _execute_step_internal(self, step_name: str):
"""Implement the actual step logic."""
# Example: "manager_approval" step
if step_name == "manager_approval":
# Call agent to fetch approval
approval = self._request_manager_approval()
return approval
elif step_name == "compliance_review":
# Call compliance check
return self._run_compliance_check()
else:
raise ValueError(f"Unknown step: {step_name}")

def pause_session(self):
"""Pause the session (e.g., waiting for external input)."""
self.current_state.paused_at = datetime.now()
self.persister.save_checkpoint(self.current_state, checkpoint_type="paused")
print(f"Session {self.current_state.session_id} paused")

def _generate_session_id(self) -> str:
"""Generate unique session ID."""
import uuid
return str(uuid.uuid4())

Handling Concurrent Sessions and Conflicts

If a user opens the same task in two browser tabs, both might try to save state. Use optimistic locking to prevent lost updates:

class ConcurrentSessionPersister(SessionPersister):
def save_checkpoint(self, state: AgentSessionState, expected_version: int = None):
"""Save with version checking to prevent concurrent overwrites."""
state.last_updated = datetime.now()

checkpoint_record = {
"session_id": state.session_id,
"version": state.resume_count, # Use resume count as version
"timestamp": datetime.now().isoformat(),
"state": state.to_dict()
}

# Try to save if version matches
success = self.storage.save_if_version_matches(
state.session_id,
checkpoint_record,
expected_version or state.resume_count
)

if not success:
raise VersionConflictError(
f"Session {state.session_id} was modified by another process. Reload and retry."
)

def load_session(self, session_id: str) -> Optional[AgentSessionState]:
"""Load session and its current version."""
checkpoint = self.storage.load(session_id)
if not checkpoint:
return None

state = AgentSessionState.from_dict(checkpoint["state"])
state.resume_count = checkpoint.get("version", 0)

return state

Storage Backend Options

# Example 1: Database storage (PostgreSQL)
class PostgresSessionStore:
def __init__(self, db_connection):
self.db = db_connection
self._ensure_table()

def save(self, session_id: str, checkpoint: dict):
"""Save checkpoint to PostgreSQL."""
import json
self.db.execute("""
INSERT INTO agent_sessions (session_id, checkpoint, updated_at)
VALUES (?, ?, NOW())
ON CONFLICT (session_id) DO UPDATE SET checkpoint = ?, updated_at = NOW()
""", (session_id, json.dumps(checkpoint), json.dumps(checkpoint)))

def load(self, session_id: str) -> Optional[dict]:
"""Load checkpoint from PostgreSQL."""
import json
row = self.db.execute(
"SELECT checkpoint FROM agent_sessions WHERE session_id = ?",
(session_id,)
).fetchone()
return json.loads(row[0]) if row else None


# Example 2: Cloud storage (AWS S3)
class S3SessionStore:
def __init__(self, s3_client, bucket: str):
self.s3 = s3_client
self.bucket = bucket

def save(self, session_id: str, checkpoint: dict):
"""Save checkpoint to S3."""
import json
key = f"sessions/{session_id}/latest.json"
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=json.dumps(checkpoint),
ContentType="application/json"
)

def load(self, session_id: str) -> Optional[dict]:
"""Load checkpoint from S3."""
import json
try:
key = f"sessions/{session_id}/latest.json"
response = self.s3.get_object(Bucket=self.bucket, Key=key)
return json.loads(response["Body"].read())
except self.s3.exceptions.NoSuchKey:
return None

Key Takeaways

  • Define a comprehensive session state object capturing workflow progress, working memory, episodic records, and learned facts.
  • Save checkpoints at critical points: after each workflow step, on disconnect, and on error.
  • Load and restore state on reconnection; resume from where the agent left off, avoiding redundant work.
  • Use optimistic locking to prevent concurrent session conflicts; detect and resolve with version checking.
  • Store checkpoints in persistent backend: database (simple, transactional) or cloud storage (scalable, durable).

Frequently Asked Questions

How frequently should I checkpoint?

After each workflow step (guaranteed consistency). For conversational agents, every 5–10 turns. For long-running tasks, every minute or after user input. Balance between safety (frequent checkpoints, high overhead) and efficiency (infrequent, high risk of data loss).

What if the checkpoint is corrupted or incomplete?

Log checkpoints in a write-ahead format: write checkpoint, then mark as "committed" only after successful write. If a checkpoint is corrupted, revert to the last committed checkpoint (rollback).

Can I checkpoint working memory or just the high-level task state?

Checkpoint both. Working memory enables the agent to resume mid-conversation seamlessly. High-level state (completed steps) enables resuming after a long break. A tiered approach: checkpoint working memory frequently (every turn), checkpoint task state less often (every step).

How long should I keep old sessions?

Keep for the lifetime of the task plus 30 days (recovery window). Archive old sessions to cold storage (S3 Glacier) for auditing. Delete very old sessions per retention policy (GDPR, CCPA).

How do I handle sessions that are truly broken (unrecoverable error)?

Mark the session as "failed" and create a "post-mortem" record for auditing. Offer the user the choice to: (1) retry the failed step, (2) start a new task, or (3) contact support.

Further Reading