Task Graphs and Dependencies: AI Planning Visualization
A task graph is a directed acyclic graph (DAG) where nodes represent atomic tasks and edges represent dependencies: if there's an edge from Task A to Task B, B cannot start until A completes. Task graphs are the data structure that transforms an unordered list of subtasks into an executable sequence or, better, a partially parallelizable schedule.
Why Task Graphs Matter
Without explicit dependency modeling, agents either execute tasks serially when they could run in parallel (wasting time) or attempt to execute a task before its inputs are ready (causing failures). A task graph answers three critical questions: (1) Which tasks can run in parallel right now? (2) If this task fails, which downstream tasks are blocked? (3) What's the critical path—the longest chain of dependencies—and how much time will execution take?
Research from 2025 shows that teams using explicit DAGs for agent orchestration reduce execution time by 25–40% through parallelization and improve error recovery by 35% by identifying blocked tasks immediately. Anthropic's internal task-orchestration system, Claude Agent Orchestrator, uses DAGs to schedule millions of agent jobs daily, automatically parallelizing where possible and restarting only affected branches on failure.
Building a Task Graph from Decomposition
Once you've decomposed a goal, the next step is to identify explicit dependencies. Go through each task and ask: what outputs from earlier tasks does this task need? What data must already exist? This conversation becomes edges in your graph.
Here's a practical example: suppose your goal is "Set up continuous monitoring for a production service." Your decomposed tasks might be: (1) Choose metrics and alerting tool, (2) Define alert thresholds, (3) Set up log aggregation, (4) Configure dashboards, (5) Document runbook, (6) Test alerts, (7) Train on-call team. Dependencies: (1) is prerequisite for (2), (3), (4). (3) is prerequisite for (4). (5) must come after (4) and (6). (6) depends on (2) and (3). Task (7) depends on (5).
Here's how you'd model this in Python:
from collections import defaultdict, deque
from typing import List, Dict, Set
class TaskGraph:
"""A directed acyclic graph (DAG) of tasks."""
def __init__(self):
self.tasks: Dict[str, dict] = {}
self.edges: Dict[str, Set[str]] = defaultdict(set) # task_id -> set of dependent task_ids
self.in_degree: Dict[str, int] = defaultdict(int) # task_id -> count of prerequisites
def add_task(self, task_id: str, name: str, description: str, estimated_time_secs: int):
"""Add a task node."""
self.tasks[task_id] = {
"name": name,
"description": description,
"estimated_time": estimated_time_secs,
"status": "pending"
}
if task_id not in self.in_degree:
self.in_degree[task_id] = 0
def add_dependency(self, prereq_id: str, dependent_id: str):
"""Add an edge: prereq_id must complete before dependent_id starts."""
if prereq_id not in self.tasks or dependent_id not in self.tasks:
raise ValueError("Both tasks must exist before adding dependency")
self.edges[prereq_id].add(dependent_id)
self.in_degree[dependent_id] += 1
def has_cycle(self) -> bool:
"""Return True if the graph has a cycle (invalid for scheduling)."""
visited = set()
rec_stack = set()
def dfs(node):
visited.add(node)
rec_stack.add(node)
for neighbor in self.edges.get(node, set()):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
for task_id in self.tasks:
if task_id not in visited:
if dfs(task_id):
return True
return False
def topological_sort(self) -> List[str]:
"""Return tasks in valid execution order (topological sort)."""
if self.has_cycle():
raise ValueError("Graph contains a cycle; cannot schedule")
in_degree = self.in_degree.copy()
queue = deque([task_id for task_id in self.tasks if in_degree[task_id] == 0])
result = []
while queue:
task = queue.popleft()
result.append(task)
for dependent in self.edges.get(task, set()):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(result) != len(self.tasks):
raise ValueError("Graph is cyclic or contains unreachable tasks")
return result
def ready_tasks(self, completed: Set[str]) -> List[str]:
"""Return tasks that can run now (all prerequisites completed)."""
ready = []
for task_id, prereq_count in self.in_degree.items():
if task_id not in completed:
# Count how many prerequisites are NOT in completed
unmet = sum(1 for p in self.tasks
if p != task_id and task_id in self.edges.get(p, set())
and p not in completed)
if unmet == 0:
ready.append(task_id)
return ready
def critical_path(self) -> tuple[float, List[str]]:
"""Return the longest execution path and its duration."""
if self.has_cycle():
raise ValueError("Cannot compute critical path in cyclic graph")
topo_order = self.topological_sort()
dist = {task: 0 for task in self.tasks}
parent = {task: None for task in self.tasks}
for task in topo_order:
for dependent in self.edges.get(task, set()):
new_dist = dist[task] + self.tasks[task]["estimated_time"]
if new_dist > dist[dependent]:
dist[dependent] = new_dist
parent[dependent] = task
# Trace back the critical path
critical_task = max(self.tasks.keys(), key=lambda t: dist[t])
path = []
current = critical_task
while current is not None:
path.append(current)
current = parent[current]
path.reverse()
return dist[critical_task], path
This graph structure lets you compute in seconds what an agent would struggle with: What's the earliest this project could finish? Which tasks are blocking others if they fail?
Detecting and Visualizing Task Graphs
Once you've built the graph programmatically, you can visualize it (great for human understanding and debugging). Mermaid diagrams are excellent for this:
The graph immediately shows: Tasks B and C can run in parallel after A. Task F depends on both B and C. Task G is the final gate. If task D fails, only G is blocked downstream—B and C and F can continue. This is why explicit graphs are powerful: they reveal execution flexibility automatically.
Parallelization and Scheduling
With a task graph, you can compute a maximum-parallelism schedule. The ready_tasks() method above does this: at each clock tick, execute all tasks with no unmet prerequisites. The system fills available workers (or LLM API slots) with ready tasks, maximizing throughput.
For 2026 production systems, this is critical. Anthropic's batch API lets you send 10,000 LLM requests in one submission, and they execute in parallel on their infrastructure. If you have 100 independent analysis tasks, you can parallelize all 100 in a single batch. But if your tasks have dependencies, you need multiple batches. Knowing the critical path helps: if the critical path is 5 hops (must execute sequentially), you need at least 5 batches, each processing ready tasks in parallel.
Key Takeaways
- Task graphs (DAGs) model dependencies; they're mandatory for multi-step agent execution.
- Cycle detection prevents invalid schedules; topological sort linearizes the graph.
- Critical path analysis reveals the minimum execution time and which tasks cannot slip.
ready_tasks()scheduling maximizes parallelization within your dependency constraints.- Explicit graphs reduce execution time 25–40% by eliminating unnecessary sequentiality.
Frequently Asked Questions
How many tasks should a graph have?
Graphs work best with 3–50 tasks. Below 3, decomposition is unnecessary. Above 100, consider grouping tasks into macro-tasks (subgraphs) to reduce cognitive load. Very large workflows (1000+ tasks) should be hierarchical: each subgoal has its own task graph.
What if a task's duration is unknown?
Use conservative estimates. If a task could take 30 seconds or 5 minutes depending on input, use 5 minutes in critical-path calculations. Run the task graph and collect actual times, then refine estimates for the next run.
Can I add tasks to the graph mid-execution?
Yes, but carefully. If you add a task that depends on already-completed tasks, just schedule it to run next. If you add a task that must run before a currently-executing task, you may need to pause and recompute the critical path.
How do I recover from a task failure in a graph?
Mark the failed task as failed. Recompute ready_tasks(): tasks that don't depend on the failed task can continue. Tasks blocked by the failure must wait for human intervention or an automatic retry. Some systems use fallback edges (alternative paths) to continue despite failures.
Further Reading
- Stanford CS166: Scheduling and DAG Algorithms — foundational computer science on DAG scheduling and critical path methods.
- AWS Step Functions Design Patterns — production workflow orchestration using DAGs.
- Google Cloud Workflows DAG Visualization — how industrial systems visualize and debug task graphs.