Skip to main content

Building a Resilient LLM Client: Production Architecture

A production resilient LLM client combines all the patterns from earlier articles into a single cohesive library. Instead of reinventing these patterns for each project, you build once and reuse everywhere. This client integrates exponential backoff, rate limiting, circuit breakers, timeouts, failover, idempotency, and graceful degradation into a clean, composable API.

The key is layering: each resilience pattern operates independently but interacts gracefully. Timeouts prevent individual requests from hanging. Exponential backoff spaces retries. Circuit breakers prevent cascading failures. Failover spreads load across providers. Together, they create a system that is hard to break.

Composable Architecture

A resilient client has layers of protection:

User Request

[Idempotency Check] — Return cached result if key already processed

[Rate Limiter] — Wait if quota exceeded

[Circuit Breaker] — Fail fast if API is broken

[Timeout Wrapper] — Kill request after N seconds

[Retry Loop] — Retry with exponential backoff

├─ [Primary Provider] — Try API A
├─ [Secondary Provider] — Fall back to API B
└─ [Cache Fallback] — Return stale cache

Result

Each layer is independent and can be toggled on/off. This modular design makes testing easier and reasoning clearer.

Production-Grade Python Implementation

Here is a complete, production-ready client:

import asyncio
import time
import uuid
from typing import Optional, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class ProviderType(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
COHERE = "cohere"

@dataclass
class ResilientClientConfig:
"""Configuration for resilient LLM client."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
timeout_seconds: float = 60.0
circuit_breaker_threshold: int = 5
circuit_breaker_recovery_timeout: float = 30.0
rate_limit_capacity: int = 3500
rate_limit_refill_rate: float = 58.33 # tokens per second
enable_caching: bool = True
cache_ttl_seconds: int = 3600

class ResilientLLMClient:
"""Production-grade resilient LLM client combining all patterns."""

def __init__(
self,
providers: Dict[ProviderType, str], # {provider: api_key}
config: ResilientClientConfig = ResilientClientConfig()
):
self.providers = providers
self.config = config

# Initialize components
self.rate_limiter = TokenBucket(
config.rate_limit_capacity,
config.rate_limit_refill_rate
)

self.circuit_breakers = {
provider: CircuitBreaker(
failure_threshold=config.circuit_breaker_threshold,
recovery_timeout=config.circuit_breaker_recovery_timeout
)
for provider in providers.keys()
}

self.cache = {} if config.enable_caching else None

async def complete(
self,
prompt: str,
model: str = "gpt-4",
idempotency_key: Optional[str] = None,
**kwargs
) -> str:
"""
Complete a prompt with full resilience.

Args:
prompt: User prompt
model: Model name (e.g., "gpt-4")
idempotency_key: Unique request ID; generated if None
**kwargs: Additional args (max_tokens, temperature, etc.)

Returns:
Completion text

Raises:
Exception if all providers fail
"""
if idempotency_key is None:
idempotency_key = str(uuid.uuid4())

# 1. Check idempotency
cached = self._get_idempotency_cache(idempotency_key)
if cached:
logger.info(f"Returning idempotency cache for {idempotency_key}")
return cached

# 2. Rate limit
await self.rate_limiter.wait_for_quota(1)

# 3. Try providers in order with failover
for provider in self.providers.keys():
try:
# 4. Check circuit breaker
breaker = self.circuit_breakers[provider]
if breaker.is_open():
logger.warning(f"{provider.value} circuit is open; skipping")
continue

# 5. Call provider with timeout
result = await self._call_with_timeout(
provider,
prompt,
model,
idempotency_key,
**kwargs
)

# 6. Cache successful result
self._set_idempotency_cache(idempotency_key, result)

return result
except Exception as e:
logger.error(f"{provider.value} failed: {e}")
breaker.record_failure()
continue

# 7. Fall back to cache
stale = self._get_cache_stale(prompt)
if stale:
logger.warning("Using stale cache as last resort")
return stale

raise Exception("All providers failed and no cache available")

async def _call_with_timeout(
self,
provider: ProviderType,
prompt: str,
model: str,
idempotency_key: str,
**kwargs
) -> str:
"""Call provider with timeout and retries."""

for attempt in range(self.config.max_retries + 1):
try:
# Create timeout context
timeout_task = asyncio.wait_for(
self._call_provider(
provider,
prompt,
model,
idempotency_key,
**kwargs
),
timeout=self.config.timeout_seconds
)

result = await timeout_task
self.circuit_breakers[provider].record_success()
return result

except asyncio.TimeoutError as e:
logger.warning(f"Timeout on {provider.value} attempt {attempt + 1}")
if attempt < self.config.max_retries:
wait_time = min(
self.config.max_delay,
self.config.base_delay * (2 ** attempt)
)
logger.info(f"Retrying in {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
else:
raise

except Exception as e:
logger.error(f"{provider.value} request failed: {e}")
if attempt < self.config.max_retries:
wait_time = min(
self.config.max_delay,
self.config.base_delay * (2 ** attempt)
)
await asyncio.sleep(wait_time)
else:
raise

async def _call_provider(
self,
provider: ProviderType,
prompt: str,
model: str,
idempotency_key: str,
**kwargs
) -> str:
"""Call a specific provider."""
if provider == ProviderType.OPENAI:
return await self._call_openai(prompt, model, idempotency_key, **kwargs)
elif provider == ProviderType.ANTHROPIC:
return await self._call_anthropic(prompt, model, idempotency_key, **kwargs)
else:
raise ValueError(f"Unknown provider: {provider}")

async def _call_openai(self, prompt: str, model: str, idempotency_key: str, **kwargs) -> str:
"""Call OpenAI API."""
import aiohttp

api_key = self.providers[ProviderType.OPENAI]

async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Idempotency-Key": idempotency_key
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": kwargs.get("max_tokens", 1024)
}
) as resp:
if resp.status != 200:
raise Exception(f"OpenAI API error: {resp.status}")
data = await resp.json()
return data["choices"][0]["message"]["content"]

async def _call_anthropic(self, prompt: str, model: str, idempotency_key: str, **kwargs) -> str:
"""Call Anthropic API."""
import aiohttp

api_key = self.providers[ProviderType.ANTHROPIC]

async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"Idempotency-Key": idempotency_key
},
json={
"model": model,
"max_tokens": kwargs.get("max_tokens", 1024),
"messages": [{"role": "user", "content": prompt}]
}
) as resp:
if resp.status != 200:
raise Exception(f"Anthropic API error: {resp.status}")
data = await resp.json()
return data["content"][0]["text"]

def _get_idempotency_cache(self, key: str) -> Optional[str]:
"""Get result from idempotency cache."""
if not self.config.enable_caching or not self.cache:
return None

if key not in self.cache:
return None

result, timestamp = self.cache[key]
# Idempotency cache should expire in 24 hours
if time.time() - timestamp > 86400:
del self.cache[key]
return None

return result

def _set_idempotency_cache(self, key: str, result: str):
"""Cache result by idempotency key."""
if not self.config.enable_caching or not self.cache:
return

self.cache[key] = (result, time.time())

def _get_cache_stale(self, prompt: str) -> Optional[str]:
"""Get stale cache result (no TTL check)."""
if not self.config.enable_caching or not self.cache:
return None

prompt_hash = hash(prompt)
for key, (result, _) in self.cache.items():
# Simple heuristic: cache all results
if hash(result[:50]) == hash(prompt[:50]): # Rough match
return result

return None

# Minimal CircuitBreaker and TokenBucket stubs (see earlier articles)
class CircuitBreaker:
def __init__(self, failure_threshold, recovery_timeout):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.is_open_state = False
self.last_failure = 0

def is_open(self) -> bool:
if self.is_open_state:
if time.time() - self.last_failure > self.recovery_timeout:
self.is_open_state = False
return self.is_open_state
return False

def record_failure(self):
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.is_open_state = True

def record_success(self):
self.failures = 0

class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()

async def wait_for_quota(self, tokens_needed: int):
while self.tokens < tokens_needed:
elapsed = time.time() - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = time.time()
await asyncio.sleep(0.01)
self.tokens -= tokens_needed

Usage:

# Initialize
config = ResilientClientConfig(
max_retries=3,
timeout_seconds=60,
enable_caching=True
)

client = ResilientLLMClient(
providers={
ProviderType.OPENAI: "sk-...",
ProviderType.ANTHROPIC: "sk-ant-..."
},
config=config
)

# Use it
result = await client.complete("What is AI?", model="gpt-4")

Key Takeaways

  • A resilient client layers patterns: idempotency → rate limiting → circuit breaker → timeout → retry → failover → cache.
  • Each layer is independent and can be disabled for testing.
  • Use configuration objects to tune behavior without code changes.
  • Implement observability (logging, metrics) at each layer.
  • Test each pattern in isolation and integration.

Frequently Asked Questions

Should I build this or use a library?

For simple use cases, use a library like litellm or LangChain. For production systems with strict requirements, build your own so you control every detail. Most teams benefit from building their own after outgrowing libraries.

How do I test a resilient client?

Test each pattern in isolation (unit tests) and end-to-end (integration tests with chaos injection). See article 10 for detailed testing strategies.

Can I reuse this across teams?

Yes. Package the client as an internal library. Document configuration options and provide sensible defaults. Version carefully since breaking changes affect multiple teams.

What metrics should I track?

Track per-provider: request latency, error rate, circuit breaker state. Aggregate: total throughput, end-to-end latency, cache hit rate. Alert on: error rate spikes, circuit breaker opens, queue growth.

Further Reading