Skip to main content

Production Agent Orchestration: Scaling Task Decomposition

Moving an agent from a Jupyter notebook to production requires solving new problems: how do you orchestrate 100 parallel agents? How do you handle resource constraints (API rate limits, memory)? How do you monitor agent health and debug failures? This article covers the systems engineering needed to ship planning-based agents at scale.

Production Agent Architecture

A production agent system has several layers:

┌────────────────────────────────────────────────────┐
│ API / User Interface Layer │
│ (REST, gRPC, webhook) │
└─────────────────┬────────────────────────────────┘

┌─────────────────v────────────────────────────────┐
│ Orchestrator / Controller Layer │
│ (task scheduling, resource allocation) │
└─────────────────┬────────────────────────────────┘

┌─────────────────v────────────────────────────────┐
│ Agent Worker Layer │
│ (execute tasks, call LLMs, use tools) │
└─────────────────┬────────────────────────────────┘

┌─────────────────v────────────────────────────────┐
│ Observability Layer │
│ (logging, metrics, tracing, alerting) │
└─────────────────┬────────────────────────────────┘

┌─────────────────v────────────────────────────────┐
│ Storage / State Layer │
│ (persistent state, plans, results) │
└────────────────────────────────────────────────────┘

Orchestration: Scheduling Multiple Agents

The orchestrator assigns tasks to available agent workers, respecting dependencies and resource limits:

import asyncio
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class WorkerState(Enum):
IDLE = "idle"
BUSY = "busy"
ERROR = "error"

@dataclass
class Worker:
id: str
state: WorkerState
current_task: Optional[str] = None
completed_tasks: int = 0
error_count: int = 0

class AgentOrchestrator:
def __init__(self, max_workers: int = 10, rate_limit_rpm: int = 60):
self.workers: Dict[str, Worker] = {}
self.max_workers = max_workers
self.rate_limit_rpm = rate_limit_rpm
self.task_queue = asyncio.Queue()
self.completed_tasks = set()

# Initialize workers
for i in range(max_workers):
self.workers[f"worker_{i}"] = Worker(id=f"worker_{i}", state=WorkerState.IDLE)

async def schedule_plan(self, plan: dict) -> dict:
"""Execute a plan using available workers."""

# Build dependency graph
graph = build_task_graph(plan)

# Add all tasks to queue
for task in plan["tasks"]:
await self.task_queue.put(task)

# Spawn worker coroutines
tasks = [
asyncio.create_task(self.worker_loop(worker_id))
for worker_id in self.workers.keys()
]

# Wait for all tasks to complete
await self.task_queue.join()

# Cancel worker loops
for task in tasks:
task.cancel()

return {
"status": "success",
"completed_tasks": len(self.completed_tasks),
"worker_stats": {
w_id: {
"completed": self.workers[w_id].completed_tasks,
"errors": self.workers[w_id].error_count
}
for w_id in self.workers.keys()
}
}

async def worker_loop(self, worker_id: str):
"""Loop for a single worker: get tasks, execute, report results."""

worker = self.workers[worker_id]

while True:
try:
# Get next task (with timeout to allow cancellation)
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
except asyncio.TimeoutError:
# Queue empty; idle worker
continue
except asyncio.CancelledError:
break

# Check if task's dependencies are complete
if not self.dependencies_satisfied(task):
# Re-queue and wait
await self.task_queue.put(task)
await asyncio.sleep(0.5)
continue

# Execute task
worker.state = WorkerState.BUSY
worker.current_task = task["id"]

try:
result = await self.execute_task_async(task)
self.completed_tasks.add(task["id"])
worker.completed_tasks += 1

except Exception as e:
worker.error_count += 1
worker.state = WorkerState.ERROR
log_error(f"Task {task['id']} failed in {worker_id}: {e}")

# Decide: retry or escalate?
if worker.error_count < 2:
await self.task_queue.put(task) # Retry
else:
escalate_task(task) # Human review

finally:
worker.state = WorkerState.IDLE
worker.current_task = None
self.task_queue.task_done()

def dependencies_satisfied(self, task: dict) -> bool:
"""Return True if all of task's dependencies are complete."""
for dep in task.get("depends_on", []):
if dep not in self.completed_tasks:
return False
return True

async def execute_task_async(self, task: dict) -> str:
"""Execute a single task (rate-limited)."""

# Rate limiting: ensure we don't exceed max RPM
await self.rate_limiter.acquire(1) # Claim a request slot

# Call LLM or tool
result = await llm_client.completion_async(task["prompt"])

return result

Resource Management and Rate Limiting

LLM APIs have rate limits (e.g., 3,500 requests per minute for some Claude models). The orchestrator must respect this:

import time
from collections import deque

class RateLimiter:
"""Token-bucket rate limiter."""

def __init__(self, max_requests_per_minute: int):
self.max_rpm = max_requests_per_minute
self.request_times: deque = deque(maxlen=max_requests_per_minute)

async def acquire(self, tokens: int = 1):
"""Wait until we can make 'tokens' requests."""

now = time.time()

# Remove old timestamps (older than 1 minute)
while self.request_times and self.request_times[0] < now - 60:
self.request_times.popleft()

# If we're at the limit, wait
while len(self.request_times) + tokens > self.max_rpm:
sleep_time = (self.request_times[0] + 60) - now
await asyncio.sleep(max(0.1, sleep_time))
now = time.time()

# Add new timestamps
for _ in range(tokens):
self.request_times.append(now)

Monitoring and Observability

Production agents need visibility into execution:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class ExecutionMetrics:
"""Track metrics for observability."""
agent_id: str
task_id: str
start_time: datetime
end_time: Optional[datetime] = None
status: str = "running" # running, success, failed
duration_seconds: Optional[float] = None
tokens_used: int = 0
cost_usd: float = 0.0
error_message: Optional[str] = None

class MetricsCollector:
"""Collect and export metrics for monitoring."""

def __init__(self, prometheus_endpoint: str = None):
self.metrics: List[ExecutionMetrics] = []
self.prometheus = prometheus_endpoint

def record_task_start(self, agent_id: str, task_id: str) -> ExecutionMetrics:
"""Record the start of a task."""
metric = ExecutionMetrics(agent_id=agent_id, task_id=task_id, start_time=datetime.now())
self.metrics.append(metric)
return metric

def record_task_end(self, metric: ExecutionMetrics, status: str, tokens: int, error: str = None):
"""Record task completion."""
metric.end_time = datetime.now()
metric.status = status
metric.duration_seconds = (metric.end_time - metric.start_time).total_seconds()
metric.tokens_used = tokens
metric.cost_usd = (tokens / 1000) * 0.01 # $0.01 per 1k tokens
metric.error_message = error

# Export to Prometheus/Datadog/CloudWatch
if self.prometheus:
self.export_to_prometheus(metric)

def export_to_prometheus(self, metric: ExecutionMetrics):
"""Push metric to Prometheus."""
# In practice: use prometheus_client.push_to_gateway()
pass

def summary(self) -> dict:
"""Return summary statistics."""

completed = [m for m in self.metrics if m.status != "running"]
successful = [m for m in completed if m.status == "success"]
failed = [m for m in completed if m.status == "failed"]

return {
"total_tasks": len(self.metrics),
"completed_tasks": len(completed),
"successful_tasks": len(successful),
"failed_tasks": len(failed),
"success_rate": len(successful) / len(completed) if completed else 0,
"avg_duration_sec": sum(m.duration_seconds for m in completed) / len(completed) if completed else 0,
"total_cost_usd": sum(m.cost_usd for m in completed),
"total_tokens_used": sum(m.tokens_used for m in completed)
}

Error Handling and Recovery

Production systems must handle failures gracefully:

class FailureHandler:
"""Implement retry logic and escalation policies."""

def __init__(self, max_retries: int = 3, escalation_threshold: int = 2):
self.max_retries = max_retries
self.escalation_threshold = escalation_threshold

async def handle_failure(self, task: dict, error: Exception, attempt: int) -> str:
"""Return: 'retry', 'escalate', or 'skip'."""

# Categorize error
if isinstance(error, RateLimitError):
return "retry" # Transient; wait and retry

elif isinstance(error, TaskValidationError):
if attempt < self.max_retries:
return "retry" # Validation failed; regenerate and retry
else:
return "escalate" # Too many retries

elif isinstance(error, MissingDependencyError):
return "escalate" # Dependency is broken; needs human review

else:
return "escalate" # Unknown error; escalate

async def escalate_to_human(self, task: dict, error: Exception):
"""Send task to human review queue."""

escalation_item = {
"task_id": task["id"],
"error": str(error),
"task_details": task,
"timestamp": datetime.now(),
"action_needed": "Human review and decision"
}

# Queue to Slack, email, or ticket system
send_to_slack(escalation_item)

Multi-Agent Workflows

For complex goals, you might need specialized agents working together:

class MultiAgentWorkflow:
"""Orchestrate multiple specialized agents."""

def __init__(self):
self.planner_agent = PlannerAgent() # Decomposes goals
self.researcher_agent = ResearcherAgent() # Gathers data
self.analyst_agent = AnalystAgent() # Analyzes findings
self.writer_agent = WriterAgent() # Summarizes results

async def run(self, goal: str) -> str:
"""Run the multi-agent workflow."""

# Step 1: Planner decomposes goal
plan = await self.planner_agent.plan(goal)

# Step 2: Researcher gathers data
research = await self.researcher_agent.research(plan["research_tasks"])

# Step 3: Analyst processes research
analysis = await self.analyst_agent.analyze(research)

# Step 4: Writer produces final output
report = await self.writer_agent.write(analysis)

return report

Key Takeaways

  • Production architecture has layers: API, orchestrator, workers, observability, storage.
  • Orchestrator schedules tasks across workers, respecting dependencies and rate limits.
  • Rate limiting prevents exceeding API quotas; token-bucket is a simple, effective algorithm.
  • Monitoring: track task completion, tokens used, cost, latency, and error rates.
  • Error handling: classify failures (transient vs. permanent) and retry or escalate accordingly.
  • Multi-agent workflows: specialized agents cooperate on complex goals.

Frequently Asked Questions

How many workers should I run in production?

Start with 5–10. Monitor CPU and API rate limits. Increase workers until rate limits become the bottleneck. Beyond that, adding workers won't help; instead, reduce task complexity or increase API quota.

What happens if the orchestrator crashes?

Save the plan and completed tasks to a database. On restart, load the state and resume from where you left off. For critical systems, run the orchestrator in high-availability mode (replicated across regions).

Can I mix different LLM models in a single workflow?

Yes. Some tasks might use fast, cheap models (Claude 3.5 Haiku); others might need reasoning (Claude Opus). The orchestrator can route tasks based on complexity. In practice, query the LLM once to ask "How complex is this task?" and route accordingly.

How do I debug a failed multi-agent workflow?

Log every task start/end with inputs and outputs. On failure, you can replay from any checkpoint. For multi-agent systems, also log inter-agent communication (what one agent passed to the next).

Further Reading