Skip to main content

Plan-and-Execute Architecture: Reliable AI Workflows

The plan-and-execute architecture is the most reliable pattern for multi-step AI agent work. The idea is simple: first ask the LLM to think through the problem and produce a plan (as structured data—a DAG or task list), then execute that plan using a deterministic executor, checking results at each step. This separation of concerns dramatically improves reliability compared to end-to-end prompting.

Why? Because planning and execution have different failure modes. LLMs can reason abstractly about a problem's structure but struggle with strict error handling and recovery. Executors can retry failed steps, validate outputs, and apply deterministic fallbacks. By separating these, you get the reasoning power of LLMs plus the robustness of traditional software.

The Plan-and-Execute Loop

A classic plan-and-execute loop has four phases:

  1. PLAN: LLM decomposes the user goal into a structured plan (JSON DAG).
  2. VALIDATE: Controller checks for cycles, missing dependencies, invalid tasks.
  3. EXECUTE: Deterministic scheduler runs ready tasks, collects outputs, handles failures.
  4. REPORT: Return results to user or loop back if recovery is needed.

Here's a pseudocode version:

class PlanAndExecuteAgent:
def __init__(self, llm_client, executor):
self.llm = llm_client
self.executor = executor

def run(self, user_goal: str) -> dict:
# PHASE 1: PLAN
plan_prompt = f"""
Break this goal into a structured task plan:
{user_goal}

Return a JSON object with:
{{
"tasks": [
{{"id": "task_1", "name": "...", "description": "...", "depends_on": []}}
],
"rationale": "Why this plan will work"
}}
"""
plan = self.llm.completion(plan_prompt, response_format="json")

# PHASE 2: VALIDATE
try:
self.validate_plan(plan)
except PlanError as e:
# If plan is invalid, ask LLM to revise
revised_plan = self.llm.completion(
f"Your plan had an issue: {e}. Revise it.",
response_format="json"
)
plan = revised_plan

# PHASE 3: EXECUTE
results = {}
for task in self.topological_sort(plan["tasks"]):
if task["depends_on"]:
inputs = {dep: results[dep] for dep in task["depends_on"]}
else:
inputs = {}

try:
result = self.executor.run_task(task, inputs)
results[task["id"]] = result
except ExecutionError as e:
# Attempt recovery
recovery = self.llm.completion(
f"Task {task['id']} failed: {e}. How should we recover?",
context={"plan": plan, "results_so_far": results}
)
if recovery == "retry":
result = self.executor.run_task(task, inputs)
results[task["id"]] = result
elif recovery == "skip":
results[task["id"]] = None
else:
raise ExecutionError(f"Unrecoverable failure in {task['id']}")

# PHASE 4: REPORT
return {
"status": "success",
"plan": plan,
"results": results
}

def validate_plan(self, plan: dict):
"""Check for cycles, missing dependencies, invalid tasks."""
task_ids = {t["id"] for t in plan["tasks"]}
for task in plan["tasks"]:
for dep in task.get("depends_on", []):
if dep not in task_ids:
raise PlanError(f"Task {task['id']} depends on non-existent {dep}")

# Check for cycles (simplified)
if self.has_cycles(plan["tasks"]):
raise PlanError("Plan contains circular dependencies")

def has_cycles(self, tasks):
# Standard cycle detection (from earlier articles)
pass

def topological_sort(self, tasks):
# Standard topological sort (from earlier articles)
pass

This pattern is much more robust than a single "do everything" prompt because:

  • Explicit plan: The user can review and critique the plan before execution starts.
  • Early validation: Errors are caught before time/tokens are spent executing.
  • Granular recovery: If step 5 fails, you retry step 5, not the entire workflow.
  • Result traceability: You know exactly which task produced which output.

Real-World Refinements

Production systems add several refinements:

1. Timeouts and SLAs: Each task has a maximum execution time. If it exceeds that, the executor cancels it and attempts recovery.

import time

def run_task_with_timeout(task, inputs, timeout_secs=60):
start = time.time()
while time.time() - start < timeout_secs:
try:
result = executor.run(task, inputs)
return result
except TemporaryFailure:
time.sleep(2)
raise TimeoutError(f"Task {task['id']} exceeded {timeout_secs}s timeout")

2. Output validation: After each task, check that outputs match the declared success criterion (from article 1).

def validate_output(task, output):
criterion = task.get("success_criterion")
if not criterion:
return True # No criterion, assume success

validator_prompt = f"""
Does this output satisfy the criterion?
Criterion: {criterion}
Output: {output}
Reply only: "yes" or "no" with brief reason.
"""
response = llm.completion(validator_prompt)
return "yes" in response.lower()

3. Branching recovery: If a task fails and recovery is uncertain, ask the user before continuing.

def run_with_human_checkpoint(task, inputs):
try:
return executor.run(task, inputs)
except ExecutionError as e:
# Log for human review
checkpoint = {
"task": task,
"error": str(e),
"inputs": inputs,
"recovery_options": ["retry", "skip", "escalate"]
}
# In production, this might queue a Slack message to on-call
decision = await get_human_decision(checkpoint)
if decision == "retry":
return run_with_human_checkpoint(task, inputs)
elif decision == "skip":
return None
else:
raise ExecutionError("User escalated")

Plan Caching and Reuse

If you're running similar goals repeatedly (e.g., "analyze sales data" weekly), you can cache plans and skip the planning phase on subsequent runs, saving ~2–3s of LLM latency.

def run_with_plan_cache(user_goal, cache_ttl_hours=168):
# Normalize goal for cache lookup
cache_key = hashlib.sha256(user_goal.encode()).hexdigest()

cached_plan = plan_cache.get(cache_key)
if cached_plan and is_fresh(cached_plan, ttl=cache_ttl_hours):
plan = cached_plan
else:
plan = llm.generate_plan(user_goal)
plan_cache.set(cache_key, plan)

return execute_plan(plan)

This is especially powerful for recurring reports or monitoring: generate the plan once, then execute it on schedule without re-planning.

Key Takeaways

  • Plan-and-execute separates reasoning from execution, improving reliability by 35–50%.
  • Validate plans before execution: check for cycles, missing dependencies, and invalid tasks.
  • Execute deterministically: strict sequencing, timeouts, and output validation catch errors early.
  • Recovery strategies (retry, skip, escalate) handle real-world failures gracefully.
  • Plan caching reduces latency for recurring goals.

Frequently Asked Questions

What's the difference between plan-and-execute and ReAct?

ReAct (covered next) is think-act-observe in a loop: reason about the next step, execute it, observe the result, then reason again. Plan-and-execute plans once upfront, then executes. ReAct is more flexible for unexpected outcomes; plan-and-execute is more efficient for well-structured goals.

Should I always generate a plan?

For goals with 1–2 steps, direct execution is fine. For 3+ steps or goals requiring complex logic, planning saves tokens and improves reliability. A good heuristic: if you'd write a GitHub issue with 5+ bullet points, plan first.

How do I handle plans that are too complex?

Break the goal into smaller subgoals and run plan-and-execute separately for each. Or ask the LLM to generate a hierarchical plan: a top-level plan with 3 phases, each phase with its own task list.

Can plans be generated incrementally?

Yes. Some systems use "rolling horizon planning": generate the plan for the next 2–3 steps, execute those, then generate the next batch. This reduces upfront planning cost but requires more LLM calls.

Further Reading