Building a Resilient LLM Client: Production Architecture
A production resilient LLM client combines all the patterns from earlier articles into a single cohesive library. Instead of reinventing these patterns for each project, you build once and reuse everywhere. This client integrates exponential backoff, rate limiting, circuit breakers, timeouts, failover, idempotency, and graceful degradation into a clean, composable API.
The key is layering: each resilience pattern operates independently but interacts gracefully. Timeouts prevent individual requests from hanging. Exponential backoff spaces retries. Circuit breakers prevent cascading failures. Failover spreads load across providers. Together, they create a system that is hard to break.
Composable Architecture
A resilient client has layers of protection:
User Request
↓
[Idempotency Check] — Return cached result if key already processed
↓
[Rate Limiter] — Wait if quota exceeded
↓
[Circuit Breaker] — Fail fast if API is broken
↓
[Timeout Wrapper] — Kill request after N seconds
↓
[Retry Loop] — Retry with exponential backoff
↓
├─ [Primary Provider] — Try API A
├─ [Secondary Provider] — Fall back to API B
└─ [Cache Fallback] — Return stale cache
↓
Result
Each layer is independent and can be toggled on/off. This modular design makes testing easier and reasoning clearer.
Production-Grade Python Implementation
Here is a complete, production-ready client:
import asyncio
import time
import uuid
from typing import Optional, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class ProviderType(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
COHERE = "cohere"
@dataclass
class ResilientClientConfig:
"""Configuration for resilient LLM client."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
timeout_seconds: float = 60.0
circuit_breaker_threshold: int = 5
circuit_breaker_recovery_timeout: float = 30.0
rate_limit_capacity: int = 3500
rate_limit_refill_rate: float = 58.33 # tokens per second
enable_caching: bool = True
cache_ttl_seconds: int = 3600
class ResilientLLMClient:
"""Production-grade resilient LLM client combining all patterns."""
def __init__(
self,
providers: Dict[ProviderType, str], # {provider: api_key}
config: ResilientClientConfig = ResilientClientConfig()
):
self.providers = providers
self.config = config
# Initialize components
self.rate_limiter = TokenBucket(
config.rate_limit_capacity,
config.rate_limit_refill_rate
)
self.circuit_breakers = {
provider: CircuitBreaker(
failure_threshold=config.circuit_breaker_threshold,
recovery_timeout=config.circuit_breaker_recovery_timeout
)
for provider in providers.keys()
}
self.cache = {} if config.enable_caching else None
async def complete(
self,
prompt: str,
model: str = "gpt-4",
idempotency_key: Optional[str] = None,
**kwargs
) -> str:
"""
Complete a prompt with full resilience.
Args:
prompt: User prompt
model: Model name (e.g., "gpt-4")
idempotency_key: Unique request ID; generated if None
**kwargs: Additional args (max_tokens, temperature, etc.)
Returns:
Completion text
Raises:
Exception if all providers fail
"""
if idempotency_key is None:
idempotency_key = str(uuid.uuid4())
# 1. Check idempotency
cached = self._get_idempotency_cache(idempotency_key)
if cached:
logger.info(f"Returning idempotency cache for {idempotency_key}")
return cached
# 2. Rate limit
await self.rate_limiter.wait_for_quota(1)
# 3. Try providers in order with failover
for provider in self.providers.keys():
try:
# 4. Check circuit breaker
breaker = self.circuit_breakers[provider]
if breaker.is_open():
logger.warning(f"{provider.value} circuit is open; skipping")
continue
# 5. Call provider with timeout
result = await self._call_with_timeout(
provider,
prompt,
model,
idempotency_key,
**kwargs
)
# 6. Cache successful result
self._set_idempotency_cache(idempotency_key, result)
return result
except Exception as e:
logger.error(f"{provider.value} failed: {e}")
breaker.record_failure()
continue
# 7. Fall back to cache
stale = self._get_cache_stale(prompt)
if stale:
logger.warning("Using stale cache as last resort")
return stale
raise Exception("All providers failed and no cache available")
async def _call_with_timeout(
self,
provider: ProviderType,
prompt: str,
model: str,
idempotency_key: str,
**kwargs
) -> str:
"""Call provider with timeout and retries."""
for attempt in range(self.config.max_retries + 1):
try:
# Create timeout context
timeout_task = asyncio.wait_for(
self._call_provider(
provider,
prompt,
model,
idempotency_key,
**kwargs
),
timeout=self.config.timeout_seconds
)
result = await timeout_task
self.circuit_breakers[provider].record_success()
return result
except asyncio.TimeoutError as e:
logger.warning(f"Timeout on {provider.value} attempt {attempt + 1}")
if attempt < self.config.max_retries:
wait_time = min(
self.config.max_delay,
self.config.base_delay * (2 ** attempt)
)
logger.info(f"Retrying in {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
else:
raise
except Exception as e:
logger.error(f"{provider.value} request failed: {e}")
if attempt < self.config.max_retries:
wait_time = min(
self.config.max_delay,
self.config.base_delay * (2 ** attempt)
)
await asyncio.sleep(wait_time)
else:
raise
async def _call_provider(
self,
provider: ProviderType,
prompt: str,
model: str,
idempotency_key: str,
**kwargs
) -> str:
"""Call a specific provider."""
if provider == ProviderType.OPENAI:
return await self._call_openai(prompt, model, idempotency_key, **kwargs)
elif provider == ProviderType.ANTHROPIC:
return await self._call_anthropic(prompt, model, idempotency_key, **kwargs)
else:
raise ValueError(f"Unknown provider: {provider}")
async def _call_openai(self, prompt: str, model: str, idempotency_key: str, **kwargs) -> str:
"""Call OpenAI API."""
import aiohttp
api_key = self.providers[ProviderType.OPENAI]
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Idempotency-Key": idempotency_key
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": kwargs.get("max_tokens", 1024)
}
) as resp:
if resp.status != 200:
raise Exception(f"OpenAI API error: {resp.status}")
data = await resp.json()
return data["choices"][0]["message"]["content"]
async def _call_anthropic(self, prompt: str, model: str, idempotency_key: str, **kwargs) -> str:
"""Call Anthropic API."""
import aiohttp
api_key = self.providers[ProviderType.ANTHROPIC]
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"Idempotency-Key": idempotency_key
},
json={
"model": model,
"max_tokens": kwargs.get("max_tokens", 1024),
"messages": [{"role": "user", "content": prompt}]
}
) as resp:
if resp.status != 200:
raise Exception(f"Anthropic API error: {resp.status}")
data = await resp.json()
return data["content"][0]["text"]
def _get_idempotency_cache(self, key: str) -> Optional[str]:
"""Get result from idempotency cache."""
if not self.config.enable_caching or not self.cache:
return None
if key not in self.cache:
return None
result, timestamp = self.cache[key]
# Idempotency cache should expire in 24 hours
if time.time() - timestamp > 86400:
del self.cache[key]
return None
return result
def _set_idempotency_cache(self, key: str, result: str):
"""Cache result by idempotency key."""
if not self.config.enable_caching or not self.cache:
return
self.cache[key] = (result, time.time())
def _get_cache_stale(self, prompt: str) -> Optional[str]:
"""Get stale cache result (no TTL check)."""
if not self.config.enable_caching or not self.cache:
return None
prompt_hash = hash(prompt)
for key, (result, _) in self.cache.items():
# Simple heuristic: cache all results
if hash(result[:50]) == hash(prompt[:50]): # Rough match
return result
return None
# Minimal CircuitBreaker and TokenBucket stubs (see earlier articles)
class CircuitBreaker:
def __init__(self, failure_threshold, recovery_timeout):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.is_open_state = False
self.last_failure = 0
def is_open(self) -> bool:
if self.is_open_state:
if time.time() - self.last_failure > self.recovery_timeout:
self.is_open_state = False
return self.is_open_state
return False
def record_failure(self):
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.is_open_state = True
def record_success(self):
self.failures = 0
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
async def wait_for_quota(self, tokens_needed: int):
while self.tokens < tokens_needed:
elapsed = time.time() - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = time.time()
await asyncio.sleep(0.01)
self.tokens -= tokens_needed
Usage:
# Initialize
config = ResilientClientConfig(
max_retries=3,
timeout_seconds=60,
enable_caching=True
)
client = ResilientLLMClient(
providers={
ProviderType.OPENAI: "sk-...",
ProviderType.ANTHROPIC: "sk-ant-..."
},
config=config
)
# Use it
result = await client.complete("What is AI?", model="gpt-4")
Key Takeaways
- A resilient client layers patterns: idempotency → rate limiting → circuit breaker → timeout → retry → failover → cache.
- Each layer is independent and can be disabled for testing.
- Use configuration objects to tune behavior without code changes.
- Implement observability (logging, metrics) at each layer.
- Test each pattern in isolation and integration.
Frequently Asked Questions
Should I build this or use a library?
For simple use cases, use a library like litellm or LangChain. For production systems with strict requirements, build your own so you control every detail. Most teams benefit from building their own after outgrowing libraries.
How do I test a resilient client?
Test each pattern in isolation (unit tests) and end-to-end (integration tests with chaos injection). See article 10 for detailed testing strategies.
Can I reuse this across teams?
Yes. Package the client as an internal library. Document configuration options and provide sensible defaults. Version carefully since breaking changes affect multiple teams.
What metrics should I track?
Track per-provider: request latency, error rate, circuit breaker state. Aggregate: total throughput, end-to-end latency, cache hit rate. Alert on: error rate spikes, circuit breaker opens, queue growth.