Skip to main content

Dynamic Replanning: Adapting Plans When Reality Diverges

Even the best-laid plans go awry. An agent starts executing a plan: "Deploy service, run tests, monitor for 1 hour." But 20 minutes in, a critical service that deployment depends on goes offline unexpectedly. The original plan is now impossible. Dynamic replanning is the practice of detecting plan failure mid-execution and generating a new plan that accounts for the changed world state.

When to Replan

An agent should consider replanning in three scenarios:

  1. Task failure: A subtask fails despite retries, blocking dependents.
  2. Constraint violation: A resource limit (time, cost, tokens) is exceeded.
  3. Goal update: The user provides new information or changes the objective mid-execution.

Replanning is expensive—it requires re-analyzing the entire goal—so you need decision logic to distinguish between "try again" (cheap) and "replan" (expensive).

class DynamicReplanningAgent:
def __init__(self, llm_client, max_replans=3):
self.llm = llm_client
self.max_replans = max_replans
self.replan_count = 0

def should_replan(self, task_failure, context):
"""Decide if this is a recoverable failure or requires replanning."""

# Recoverable: temporary network error, task timeout
if task_failure.is_transient():
return False # Retry instead

# Permanent: missing dependency, resource exhausted
if task_failure.is_permanent():
return True # Replan

# Uncertain: escalate to LLM
eval_prompt = f"""
This task failed: {task_failure.description}
Context: {context}

Is this a temporary failure (retry) or permanent (replan)?
Reply: RETRY or REPLAN"""

response = self.llm.completion(eval_prompt)
return "REPLAN" in response.upper()

def replan(self, original_goal: str, current_state: dict) -> dict:
"""Generate a new plan given current execution state."""

if self.replan_count >= self.max_replans:
raise ReplanningError("Max replan attempts exceeded")

replan_prompt = f"""
ORIGINAL GOAL: {original_goal}

WHAT HAS BEEN ACCOMPLISHED:
{current_state['completed_tasks']}

WHAT FAILED:
{current_state['failed_task']}

CURRENT WORLD STATE:
{current_state['world_state']}

Generate a new plan to achieve the goal given these constraints:
- You cannot redo completed tasks (they're done).
- You cannot use the failed task; find an alternative.
- Minimize additional work; reuse completed outputs where possible.

FORMAT AS JSON:
{{
"rationale": "Why this new plan works",
"remaining_tasks": [
{{"id": "...", "name": "...", "depends_on": [...], "alternative": true/false}}
]
}}"""

new_plan = self.llm.completion(replan_prompt, response_format="json")
self.replan_count += 1

return new_plan

def execute_with_replanning(self, goal: str) -> dict:
"""Execute a plan, replanning if necessary."""

plan = self.llm.generate_plan(goal)
completed = set()
results = {}

for task in topological_sort(plan["tasks"]):
try:
# Attempt task
result = executor.run_task(task, get_inputs(task, results))
completed.add(task["id"])
results[task["id"]] = result

except TaskFailure as e:
# Check if we should replan
if not self.should_replan(e, {"completed": completed, "goal": goal}):
# Retry once more
try:
result = executor.run_task(task, get_inputs(task, results))
completed.add(task["id"])
results[task["id"]] = result
except TaskFailure:
raise ExecutionFailed(f"Task {task['id']} failed after retry")
else:
# Replan
current_state = {
"completed_tasks": completed,
"failed_task": task["id"],
"world_state": self.get_world_state()
}

new_plan = self.replan(goal, current_state)

# Resume execution with new plan
# (skip already-completed tasks)
plan = new_plan

return {"status": "success", "results": results, "replans": self.replan_count}

def get_world_state(self) -> str:
"""Snapshot current environment state for replanning context."""
# This is task-specific: available resources, running services, etc.
return {
"timestamp": time.time(),
"available_resources": get_resources(),
"external_service_status": check_dependencies()
}

Incremental Replanning

Full replanning is expensive. Incremental replanning computes only the affected portion of the plan:

def incremental_replan(plan: dict, failed_task_id: str) -> dict:
"""Replan only the subtree blocked by failed_task_id."""

# Find all tasks that depend (directly or indirectly) on the failed task
blocked_tasks = find_dependents(plan["tasks"], failed_task_id)

# Find alternative paths to the original goal
# (This is complex; approximated here)
replan_prompt = f"""
The task "{failed_task_id}" failed. These tasks are now blocked: {blocked_tasks}

Original downstream goal: {plan['final_goal']}

What's an alternative way to achieve that goal, given this failure?"""

alternative_subplan = llm.completion(replan_prompt, response_format="json")

# Merge: keep all non-blocked tasks, replace blocked with alternative
merged_plan = {
"tasks": [t for t in plan["tasks"] if t["id"] not in blocked_tasks],
"tasks": merged_plan["tasks"] + alternative_subplan["tasks"]
}

return merged_plan

This is much faster than full replanning: if only 2 of 20 tasks are blocked, replan those 2 and their dependents, not all 20.

Handling User-Driven Replanning

Sometimes the user provides new information mid-execution: "Actually, you don't need to support PostgreSQL, only MySQL," or "We have budget for AWS now, not just open-source."

def handle_goal_update(current_plan: dict, old_goal: str, new_goal: str, context: dict):
"""User changed the goal; replan with new constraints."""

diff_prompt = f"""
OLD GOAL: {old_goal}
NEW GOAL: {new_goal}

What changed? What can we skip? What must we add?"""

diff = llm.completion(diff_prompt)

# Replan only the affected parts
replan_prompt = f"""
Goal changed from:
{old_goal}
to:
{new_goal}

These tasks are already complete: {context['completed']}

Generate a new plan that:
1. Keeps completed tasks (don't redo them).
2. Skips tasks no longer needed.
3. Adds tasks required by the new goal.
"""

new_plan = llm.completion(replan_prompt, response_format="json")
return new_plan

Cost Management and Backoff

Replanning is expensive (1–2 LLM calls). In production, you need strategies to limit how often you replan:

Exponential backoff: After the first replan, wait longer before considering the second.

replan_cooldown = 1  # seconds
for attempt in range(max_replans):
if should_replan(...):
if time.time() - last_replan_time < replan_cooldown:
log("Replan cooldown active, escalating instead")
escalate_to_human()
else:
replan()
replan_cooldown *= 2 # exponential backoff

Replan budgets: Allocate a token budget for replanning.

replan_token_budget = 5000  # tokens
tokens_used = 0

if should_replan(...) and tokens_used < replan_token_budget:
new_plan = replan()
tokens_used += count_tokens(new_plan)
else:
escalate_to_human()

Key Takeaways

  • Detect replanning moments: task failures, constraint violations, goal updates.
  • Full replanning regenerates the entire plan; incremental replanning fixes only blocked tasks.
  • Incremental replanning is 3–5x cheaper and faster than full replanning.
  • Use LLM as decision-maker: should we retry or replan?
  • Set replan budgets and cooldowns to prevent infinite loops.

Frequently Asked Questions

How do I know if I've entered a replan loop (oscillating)?

Track replan history. If you're replanning the same goal change 2+ times with the same failure, escalate to human. This indicates the goal is unsatisfiable with available resources.

Can I replan automatically or should I ask the user?

For transient failures (network glitch): replan automatically. For permanent changes (user request): ask first. For uncertain cases, attempt 1 automatic replan; if that fails, escalate.

How much of the plan should I recompute?

Only the affected part. If task 5 fails and tasks 1–4 are done, recompute from task 5 onward, keeping 1–4's results. Full replanning is rarely necessary.

What if replanning itself fails (LLM can't generate a valid plan)?

Escalate to human. Log the goal, current state, and failure reason. A human can often spot a solution the LLM missed.

Further Reading