Skip to main content

Deploy customer support AI: Production scaling

The difference between a working agent and a production agent is reliability. I've seen companies deploy support agents that work fine in testing but fail at scale: timeouts under load, API cascades, dropped conversations, memory leaks. This article covers production-grade deployment: infrastructure decisions, graceful degradation, failover, monitoring, and disaster recovery patterns used by Stripe, Intercom, and Zendesk.

Production architecture: High-level overview

A production support agent spans multiple systems:

Customer Message

[API Gateway - Rate limiting, auth, validation]

[Queue - SQS/Kafka for buffering]

[Agent Service - Claude API calls, tool execution]

[Tool Service Layer - Refunds, tickets, lookups]

[Persistence - Conversation storage, audit logs]

[Monitoring - Metrics, alerts, tracing]

Each layer must be independently scalable and resilient.

Infrastructure decisions

import os
from typing import Optional

class ProductionConfig:
"""Configuration for production deployment."""

# API & Load Balancing
API_PORT = int(os.getenv("API_PORT", 8080))
API_WORKERS = int(os.getenv("API_WORKERS", 32))
API_TIMEOUT_SECONDS = int(os.getenv("API_TIMEOUT_SECONDS", 30))
API_RATE_LIMIT_PER_MINUTE = int(os.getenv("RATE_LIMIT", 1000))

# Claude API
CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022")
CLAUDE_API_KEY = os.getenv("ANTHROPIC_API_KEY")
CLAUDE_TIMEOUT_SECONDS = int(os.getenv("CLAUDE_TIMEOUT", 20))
CLAUDE_MAX_RETRIES = int(os.getenv("CLAUDE_RETRIES", 3))

# Queue
QUEUE_TYPE = os.getenv("QUEUE_TYPE", "sqs") # sqs, kafka, rabbitmq
QUEUE_URL = os.getenv("QUEUE_URL")
QUEUE_BATCH_SIZE = int(os.getenv("QUEUE_BATCH", 100))
QUEUE_VISIBILITY_TIMEOUT = int(os.getenv("QUEUE_VISIBILITY", 300)) # 5 min

# Database
DB_TYPE = os.getenv("DB_TYPE", "postgres")
DB_URL = os.getenv("DATABASE_URL")
DB_POOL_SIZE = int(os.getenv("DB_POOL", 20))
DB_TIMEOUT_SECONDS = int(os.getenv("DB_TIMEOUT", 5))

# Caching (Redis)
REDIS_URL = os.getenv("REDIS_URL")
REDIS_TTL_SECONDS = int(os.getenv("REDIS_TTL", 3600))

# Monitoring
DATADOG_API_KEY = os.getenv("DATADOG_API_KEY")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
ENABLE_TRACING = os.getenv("ENABLE_TRACING", "true").lower() == "true"

# Graceful Degradation
FALLBACK_MODEL = os.getenv("FALLBACK_MODEL", "claude-3-5-haiku-20241022") # Cheaper/faster
FALLBACK_ENABLED = os.getenv("FALLBACK_ENABLED", "true").lower() == "true"

@classmethod
def validate(cls) -> list[str]:
"""Validate required config before startup."""
errors = []

if not cls.CLAUDE_API_KEY:
errors.append("ANTHROPIC_API_KEY not set")
if not cls.DB_URL:
errors.append("DATABASE_URL not set")
if not cls.QUEUE_URL:
errors.append("QUEUE_URL not set")

return errors

Rate limiting and request queuing

Never let customer requests directly hit Claude. Buffer them:

from datetime import datetime, timedelta
from collections import defaultdict
import asyncio

class RateLimiter:
"""Token-bucket rate limiter."""

def __init__(self, rate_per_minute: int):
self.rate_per_minute = rate_per_minute
self.buckets = defaultdict(lambda: {"tokens": rate_per_minute, "last_refill": datetime.now()})

def allow_request(self, customer_id: str) -> bool:
"""Check if request is allowed for customer."""
bucket = self.buckets[customer_id]

# Refill tokens based on time elapsed
now = datetime.now()
seconds_elapsed = (now - bucket["last_refill"]).total_seconds()
tokens_to_add = seconds_elapsed * (self.rate_per_minute / 60.0)

bucket["tokens"] = min(
self.rate_per_minute,
bucket["tokens"] + tokens_to_add
)
bucket["last_refill"] = now

# Check if request is allowed
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True

return False

class RequestQueue:
"""Queue incoming requests; process asynchronously."""

def __init__(self, queue_url: str, batch_size: int = 100):
self.queue_url = queue_url
self.batch_size = batch_size
self.local_queue = asyncio.Queue()

async def enqueue(self, message: dict):
"""Add message to queue."""
await self.local_queue.put({
"timestamp": datetime.now().isoformat(),
"message": message
})

async def dequeue_batch(self) -> list[dict]:
"""Dequeue up to batch_size messages."""
batch = []
for _ in range(self.batch_size):
try:
msg = self.local_queue.get_nowait()
batch.append(msg)
except asyncio.QueueEmpty:
break
return batch

async def process_batch(self, batch: list[dict], agent):
"""Process a batch of queued requests."""
for item in batch:
try:
result = await agent.handle_message(item["message"])
# Store result in database
await self._persist_result(result)
except Exception as e:
# Log error and move on; don't crash the queue processor
await self._log_error(e, item)

async def _persist_result(self, result: dict):
"""Save result to database."""
pass # Implement in production

async def _log_error(self, error: Exception, item: dict):
"""Log error for investigation."""
pass # Implement in production

Resilience: Failover and circuit breakers

When Claude API fails, have a graceful fallback:

from enum import Enum
import time

class CircuitBreakerState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing; reject requests
HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
"""Protect against cascading failures."""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout_seconds: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout_seconds
self.expected_exception = expected_exception

self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED

def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""

if self.state == CircuitBreakerState.OPEN:
if self._should_attempt_reset():
self.state = CircuitBreakerState.HALF_OPEN
else:
raise Exception(f"Circuit breaker OPEN; will retry in {self._time_until_retry()}s")

try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e

def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
self.state = CircuitBreakerState.CLOSED

def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN

def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to retry."""
if self.last_failure_time is None:
return False

return (time.time() - self.last_failure_time) >= self.recovery_timeout

def _time_until_retry(self) -> int:
"""Time until we'll retry."""
if self.last_failure_time is None:
return 0

return max(0, self.recovery_timeout - int(time.time() - self.last_failure_time))

# Usage
claude_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout_seconds=60)

def call_claude_with_protection(prompt: str) -> str:
"""Call Claude with circuit breaker protection."""
try:
return claude_breaker.call(claude_api.complete, prompt)
except Exception:
# Circuit breaker is OPEN; use fallback model
return fallback_api.complete(prompt)

Graceful degradation

When the primary system fails, degrade gracefully:

class SupportAgentWithFallbacks:
"""Support agent with graceful degradation."""

def __init__(self):
self.primary_model = "claude-3-5-sonnet-20241022"
self.fallback_model = "claude-3-5-haiku-20241022"
self.max_tool_calls = 3

async def respond(self, message: str, customer_id: str) -> dict:
"""Respond to customer with fallback layers."""

# Layer 1: Try primary model with full tools
try:
result = await self._respond_with_model(
message,
self.primary_model,
enable_tools=True
)
return {
"response": result,
"model": self.primary_model,
"tools_enabled": True,
"fallback_level": 0
}
except TimeoutError:
pass
except Exception as e:
# Log error; try fallback
pass

# Layer 2: Try primary model without tools (faster)
try:
result = await self._respond_with_model(
message,
self.primary_model,
enable_tools=False
)
return {
"response": result,
"model": self.primary_model,
"tools_enabled": False,
"fallback_level": 1
}
except Exception:
pass

# Layer 3: Try fallback model (cheaper/faster)
try:
result = await self._respond_with_model(
message,
self.fallback_model,
enable_tools=False
)
return {
"response": result,
"model": self.fallback_model,
"tools_enabled": False,
"fallback_level": 2
}
except Exception:
pass

# Layer 4: Return cached response or generic message
cached = await self._get_cached_response(message, customer_id)
if cached:
return {
"response": cached,
"model": "cache",
"tools_enabled": False,
"fallback_level": 3
}

# Layer 5: Last resort
return {
"response": "We're experiencing high volume. Please contact support at [email protected] or try again in a few minutes.",
"model": "none",
"tools_enabled": False,
"fallback_level": 4
}

async def _respond_with_model(self, message: str, model: str, enable_tools: bool) -> str:
"""Call a specific model."""
# Implementation
pass

async def _get_cached_response(self, message: str, customer_id: str) -> Optional[str]:
"""Get cached response for similar messages."""
# Implementation
pass

Monitoring and alerting

Every production system needs observability:

from dataclasses import dataclass

@dataclass
class HealthCheck:
"""System health status."""
api_up: bool
claude_api_up: bool
database_up: bool
queue_depth: int
error_rate_percent: float
p99_latency_ms: float
overall_healthy: bool

class MonitoringClient:
"""Send metrics to monitoring service (Datadog, New Relic, etc.)."""

def __init__(self, api_key: str):
self.api_key = api_key

def record_latency(self, operation: str, duration_ms: float):
"""Record operation latency."""
# Send to Datadog/New Relic
pass

def record_error(self, error_type: str, context: dict):
"""Record an error."""
# Send to monitoring service
pass

def record_metric(self, metric_name: str, value: float, tags: dict = None):
"""Record a metric."""
# Send gauge metric to monitoring service
pass

def health_check(self) -> HealthCheck:
"""Check system health."""
# Verify all dependencies
api_up = self._check_api()
claude_up = self._check_claude_api()
db_up = self._check_database()
queue_depth = self._check_queue_depth()
error_rate = self._calculate_error_rate()
p99_latency = self._get_p99_latency()

overall = api_up and claude_up and db_up and queue_depth < 10000

return HealthCheck(
api_up=api_up,
claude_api_up=claude_up,
database_up=db_up,
queue_depth=queue_depth,
error_rate_percent=error_rate,
p99_latency_ms=p99_latency,
overall_healthy=overall
)

def _check_api(self) -> bool:
# Ping API server
pass

def _check_claude_api(self) -> bool:
# Test Claude API connectivity
pass

def _check_database(self) -> bool:
# Test database connectivity
pass

def _check_queue_depth(self) -> int:
# Get number of messages in queue
pass

def _calculate_error_rate(self) -> float:
# Calculate errors / total requests in last 5 minutes
pass

def _get_p99_latency(self) -> float:
# Get 99th percentile latency
pass

Deployment checklist

Before going live, verify:

class DeploymentChecklist:
"""Pre-deployment validation."""

checks = [
("Configuration validated", lambda: ProductionConfig.validate() == []),
("Database migrations applied", lambda: check_migrations()),
("Circuit breakers configured", lambda: check_circuit_breakers()),
("Monitoring endpoints live", lambda: check_monitoring()),
("Fallback models available", lambda: test_fallback_models()),
("Rate limiter configured", lambda: check_rate_limiter()),
("Queue set up and tested", lambda: test_queue()),
("API authentication enabled", lambda: check_auth()),
("Secrets manager configured", lambda: check_secrets()),
("Logging configured", lambda: check_logging()),
("Alerting rules set", lambda: check_alerting()),
("Runbooks written", lambda: check_runbooks()),
("Team trained on incidents", lambda: check_team_readiness()),
]

def run(self) -> dict:
"""Run all checks."""
results = {}
for check_name, check_func in self.checks:
try:
passed = check_func()
results[check_name] = "PASS" if passed else "FAIL"
except Exception as e:
results[check_name] = f"ERROR: {str(e)}"

return results

# Usage
checklist = DeploymentChecklist()
results = checklist.run()
for check, status in results.items():
print(f"{check}: {status}")

# Only deploy if all checks PASS
if all(v == "PASS" for v in results.values()):
print("Ready to deploy!")
else:
print("Fix failing checks before deploying")

Key Takeaways

  • Multi-layer architecture — API gateway, queue, agent service, tool service, persistence. Each layer must be independently scalable.
  • Rate limiting and queueing — buffer requests; never let customer traffic directly hit Claude API. Use token-bucket limiting per customer.
  • Circuit breakers — protect against cascading failures. When Claude fails, open the circuit and use fallback model.
  • Graceful degradation — 5-layer fallback (primary with tools → primary without tools → fallback model → cache → generic message). Every layer must work.
  • Comprehensive monitoring — health checks, latency tracking, error rates, queue depth. Alert on anomalies. Have runbooks for common failures.

Frequently Asked Questions

How do I handle traffic spikes?

Queue incoming requests; don't overload Claude API. Use auto-scaling: if queue depth > 1000, spin up more worker pods. Monitor Claude API usage; if approaching rate limits, use fallback model or queue with longer delays.

What's a safe timeout for Claude API calls?

20 seconds for production (Claude typically responds in 1–5 seconds). If you hit timeout, escalate to human or use fallback. Never wait >30 seconds; customer will abandon.

Should I cache all conversations or just recent ones?

Cache conversations in Redis for 1 hour; archive to database for long-term storage. This speeds up escalations (human agent can read recent context quickly) without storing everything in RAM.

How often should I run health checks?

Every 10 seconds for critical dependencies (Claude API, database), every 30 seconds for others. Alert if a health check fails twice in a row (ignore transient glitches).

What's a good queue depth threshold?

<100: healthy. 100–500: monitor but OK. 500–1000: auto-scale up. >1000: page on-call. If queue is consistently deep, you need more capacity.

Further Reading