Skip to main content

State Management in Agent Planning: Consistency Across Steps

State management is the practice of maintaining consistent, correct context as an agent executes multi-step plans. Naive agents accumulate context: each task adds output, and by task 20 the context window is full or contradictory. Production agents use strict state schemas, immutability, and validation to prevent context corruption.

The State Problem

Consider an agent executing: (1) Query customer data, (2) Segment by region, (3) Create segment for "US-West", (4) Assign sales team to "US-West". If step 2's segmentation changes (a bug in the query), step 4 could assign to the wrong group. This is state inconsistency: later tasks depend on earlier outputs, and if earlier outputs are wrong, everything downstream breaks.

Worse: if the agent is asked mid-execution to "use Europe instead of US," it must propagate that change backward (rerun steps 2–3) and forward (update step 4). Without explicit state management, the agent doesn't know which tasks are affected.

State Schema Design

Define a state schema—the data structures an agent maintains and their invariants:

from dataclasses import dataclass, field
from typing import Optional, Dict, List
from datetime import datetime

@dataclass
class AgentState:
"""Central state for an agent executing a plan."""

# Immutable inputs (never change)
goal: str
user_id: str
start_time: datetime

# Mutable working state
current_task_id: Optional[str] = None
completed_tasks: Dict[str, dict] = field(default_factory=dict) # task_id -> output

# Derived/cached values
_segment_cache: Dict[str, List[str]] = field(default_factory=dict)
_team_assignments: Dict[str, str] = field(default_factory=dict)

def mark_task_complete(self, task_id: str, output: dict) -> None:
"""Mark a task as done and cache its output."""
self.completed_tasks[task_id] = {
"output": output,
"timestamp": datetime.now(),
"version": len(self.completed_tasks)
}

def get_task_output(self, task_id: str) -> Optional[dict]:
"""Retrieve output from a previous task."""
if task_id in self.completed_tasks:
return self.completed_tasks[task_id]["output"]
return None

def invalidate_cache(self, task_id: str) -> None:
"""If task_id's output changes, invalidate dependents."""
# Find all tasks that depend on task_id and remove their cached results
dependents = self.find_dependents(task_id)
for dep_id in dependents:
if dep_id in self.completed_tasks:
del self.completed_tasks[dep_id]
log(f"Invalidated {dep_id} due to change in {task_id}")

def find_dependents(self, task_id: str) -> List[str]:
"""Return all tasks that depend on task_id's output."""
# This requires knowledge of the task graph
# Simplified: iterate tasks and check inputs
dependents = []
for t_id, t_output in self.completed_tasks.items():
if task_id in t_output.get("depends_on", []):
dependents.append(t_id)
return dependents

Isolation and Transactions

Treat agent state like a database: use transactions to ensure atomic updates. If a task fails mid-execution and leaves state partially updated, roll back.

class StateTransaction:
"""Context manager for atomic state updates."""

def __init__(self, state: AgentState):
self.state = state
self.snapshot = None

def __enter__(self):
# Save state at transaction start
import copy
self.snapshot = copy.deepcopy(self.state)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
# Roll back on exception
self.state.completed_tasks = self.snapshot.completed_tasks
log(f"Rolled back state due to {exc_type}")
return False # Propagate exception
return True

def execute_task_transactionally(state: AgentState, task: dict) -> dict:
"""Execute a task with automatic rollback on failure."""

with StateTransaction(state) as txn:
# Fetch inputs from state
inputs = {
dep: state.get_task_output(dep)
for dep in task.get("depends_on", [])
}

# Execute task
result = executor.run_task(task, inputs)

# Validate result
if not validate(result, task["success_criterion"]):
raise ValidationError(f"Task {task['id']} output invalid")

# Update state (atomically)
state.mark_task_complete(task["id"], result)

return result

State Versioning

Track changes to state over time. If replanning is needed, you can restore an earlier state:

@dataclass
class StateVersion:
version: int
timestamp: datetime
completed_tasks: Dict[str, dict]

def dump(self) -> str:
"""Serialize to JSON for storage."""
import json
return json.dumps({
"version": self.version,
"timestamp": self.timestamp.isoformat(),
"completed_tasks": self.completed_tasks
})

@staticmethod
def load(json_str: str) -> "StateVersion":
import json
data = json.loads(json_str)
return StateVersion(
version=data["version"],
timestamp=datetime.fromisoformat(data["timestamp"]),
completed_tasks=data["completed_tasks"]
)

class VersionedAgentState:
"""State with versioning for rollback and recovery."""

def __init__(self):
self.current_state: AgentState = None
self.versions: List[StateVersion] = []
self.version_number = 0

def checkpoint(self):
"""Save current state as a version."""
version = StateVersion(
version=self.version_number,
timestamp=datetime.now(),
completed_tasks=self.current_state.completed_tasks
)
self.versions.append(version)
self.version_number += 1

def rollback(self, version_num: int):
"""Restore state to a previous version."""
if version_num >= len(self.versions):
raise ValueError(f"Version {version_num} not found")

version = self.versions[version_num]
self.current_state.completed_tasks = version.completed_tasks
log(f"Rolled back to version {version_num}")

Preventing State Conflicts

In concurrent scenarios (multiple agent tasks running in parallel), ensure state updates don't collide:

import threading

class ThreadSafeAgentState(AgentState):
"""AgentState with locking for concurrent access."""

def __init__(self):
super().__init__()
self._lock = threading.RLock()

def mark_task_complete(self, task_id: str, output: dict) -> None:
with self._lock:
super().mark_task_complete(task_id, output)

def get_task_output(self, task_id: str):
with self._lock:
return super().get_task_output(task_id)

For distributed agents (running on different machines), use a database:

class DatabaseAgentState:
"""AgentState backed by a database (PostgreSQL, DynamoDB, etc.)."""

def __init__(self, agent_id: str, db_connection):
self.agent_id = agent_id
self.db = db_connection

def mark_task_complete(self, task_id: str, output: dict):
"""Atomically insert task result."""
self.db.execute("""
INSERT INTO agent_state (agent_id, task_id, output, completed_at)
VALUES (?, ?, ?, ?)
""", (self.agent_id, task_id, json.dumps(output), datetime.now()))
self.db.commit()

def get_task_output(self, task_id: str):
"""Fetch task result from database."""
row = self.db.execute("""
SELECT output FROM agent_state
WHERE agent_id = ? AND task_id = ?
""", (self.agent_id, task_id)).fetchone()

return json.loads(row[0]) if row else None

State Validation and Invariants

Assert that state respects invariants (rules that must always be true):

def validate_agent_state(state: AgentState) -> List[str]:
"""Check state invariants. Return list of violations."""

violations = []

# Invariant 1: All task outputs must be serializable
for task_id, task_result in state.completed_tasks.items():
try:
json.dumps(task_result)
except:
violations.append(f"Task {task_id} output not JSON-serializable")

# Invariant 2: All completed tasks must have valid timestamps
for task_id, task_result in state.completed_tasks.items():
if "timestamp" not in task_result:
violations.append(f"Task {task_id} missing timestamp")

# Invariant 3: Task outputs must match their declared schema
for task_id, task_result in state.completed_tasks.items():
schema = get_task_schema(task_id)
if schema and not matches_schema(task_result["output"], schema):
violations.append(f"Task {task_id} output doesn't match schema")

return violations

Run this validator periodically or after state mutations.

Key Takeaways

  • State schema defines immutable inputs, mutable working state, and derived values.
  • Treat state updates as transactions: atomic, with rollback on failure.
  • Version state regularly; enable rollback to earlier versions for recovery.
  • For concurrent/distributed agents, use thread-safe or database-backed state.
  • Validate state invariants to catch corruption early.

Frequently Asked Questions

How often should I checkpoint state?

After each completed stage (in long-horizon planning) or every 5–10 tasks (in shorter plans). More frequent = safer but slower; less frequent = faster but higher recovery cost.

Should I store all intermediate outputs or just the latest?

Store all up to a size limit (e.g., 100 MB). Intermediate results help with debugging. Prune old versions when approaching the limit.

What if two parallel tasks try to update the same state variable?

Use locks (for single-machine concurrency) or database transactions (for distributed). Design task graphs to minimize write conflicts: if tasks A and B both modify the same variable, make one depend on the other.

How do I handle state that's too large for memory?

Use a key-value store (Redis, S3) or database. State objects become references: state.get_output("task_1") fetches from external storage, not from memory.

Further Reading