Graceful Degradation: Maintain Service Under Load
Graceful degradation means your service continues to function when conditions degrade, but at reduced quality or capacity. Instead of failing completely when an LLM API is overloaded or slow, you might serve a cached response, reduce token limits, switch to a faster model, or queue requests for later processing. Users see slower or lower-quality results, but they see something instead of an error. This is a fundamental principle of resilient systems.
The insight is that 80% of a good result is better than 100% failure. If you cannot generate a full response in 5 seconds, return a partial response or a cached result instead of timing out. If demand spikes, queue new requests instead of rejecting them. Graceful degradation keeps your system available even when stressed.
Degradation Strategies
1. Result Caching
Cache successful responses and serve them when the API is slow or down:
import json
import time
from functools import lru_cache
from datetime import datetime, timedelta
class CachedLLMClient:
"""LLM client with caching for graceful degradation."""
def __init__(self, api_key: str, cache_ttl_seconds: int = 3600):
self.api_key = api_key
self.cache_ttl_seconds = cache_ttl_seconds
self.cache = {} # {prompt_hash: (result, timestamp)}
def call_llm(self, prompt: str, use_cache: bool = True, force_refresh: bool = False) -> dict:
"""
Call LLM API with optional cache fallback.
Args:
prompt: User prompt
use_cache: Whether to use cached results on API failure
force_refresh: Ignore cache and fetch fresh result
Returns:
LLM response (fresh or cached)
"""
prompt_hash = hash(prompt)
# Check cache
if use_cache and not force_refresh:
cached = self._get_cached_result(prompt_hash)
if cached:
print(f"Serving cached result for: {prompt[:50]}...")
return cached
# Try to fetch fresh result
try:
result = self._call_api(prompt)
# Cache the successful result
self.cache[prompt_hash] = (result, time.time())
return result
except Exception as e:
# API failed; use cache if available
if use_cache:
cached = self._get_cached_result(prompt_hash, ignore_ttl=True)
if cached:
print(f"API failed. Serving stale cache: {e}")
return cached
raise
def _get_cached_result(self, prompt_hash: int, ignore_ttl: bool = False) -> dict:
"""Get cached result if it exists and is fresh."""
if prompt_hash not in self.cache:
return None
result, timestamp = self.cache[prompt_hash]
elapsed = time.time() - timestamp
if not ignore_ttl and elapsed > self.cache_ttl_seconds:
return None # Cache expired
return result
def _call_api(self, prompt: str) -> dict:
"""Call the LLM API."""
import requests
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": "gpt-4", "messages": [{"role": "user", "content": prompt}]},
timeout=30
)
response.raise_for_status()
return response.json()
# Usage
client = CachedLLMClient(api_key="sk-...", cache_ttl_seconds=3600)
# Serve cached result on API failure
result = client.call_llm("What is AI?") # Cached on retry if API fails
2. Model Downgrade
When a powerful model is overloaded, switch to a faster, cheaper model:
class AdaptiveModelClient:
"""Switch models based on load conditions."""
MODELS_BY_SPEED = [
("gpt-4", 60), # Slow but powerful
("gpt-3.5-turbo", 10), # Faster, less powerful
("davinci-003", 5) # Fastest, basic
]
def __init__(self, api_key: str):
self.api_key = api_key
self.api_latencies = {} # {model: avg_latency_ms}
def select_model(self, quality_requirement: str = "high") -> str:
"""Select a model based on quality and current latency."""
if quality_requirement == "high":
preferred = ["gpt-4", "gpt-3.5-turbo"]
elif quality_requirement == "medium":
preferred = ["gpt-3.5-turbo", "davinci-003"]
else:
preferred = ["davinci-003"]
for model in preferred:
latency = self.api_latencies.get(model, float('inf'))
if latency < 5000: # Model responding in under 5 seconds
return model
# All models slow; use fastest
return "davinci-003"
def call_llm(self, prompt: str, quality_requirement: str = "high") -> dict:
"""Call LLM with adaptive model selection."""
model = self.select_model(quality_requirement)
print(f"Using model: {model}")
start = time.time()
result = self._call_api(prompt, model)
latency = (time.time() - start) * 1000
# Track latency for future decisions
self.api_latencies[model] = latency
return result
def _call_api(self, prompt: str, model: str) -> dict:
"""Call the API with specified model."""
import requests
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": model, "messages": [{"role": "user", "content": prompt}]},
timeout=10
)
response.raise_for_status()
return response.json()
# Usage
client = AdaptiveModelClient(api_key="sk-...")
# High quality when available
result = client.call_llm("Analyze this code", quality_requirement="high")
# Low quality when under load
result = client.call_llm("Summarize this", quality_requirement="low")
3. Partial Results and Streaming Fallback
When full processing takes too long, return a partial result or stream chunks:
async def call_llm_with_timeout_fallback(
prompt: str,
timeout_seconds: float = 5.0
) -> dict:
"""
Call LLM with a strict timeout. If exceeded, return partial result.
"""
import asyncio
try:
# Try to get full result with timeout
result = await asyncio.wait_for(
call_llm_full(prompt),
timeout=timeout_seconds
)
return {"status": "complete", "result": result}
except asyncio.TimeoutError:
# Timeout; return partial result
print(f"Full result timeout. Returning partial...")
return {
"status": "partial",
"result": "Response generation timed out. Partial result available.",
"partial": True
}
async def call_llm_full(prompt: str) -> str:
"""Simulate a long LLM call."""
# In reality, this calls an API
await asyncio.sleep(10) # Simulate slow processing
return "Full response"
# Usage
result = await call_llm_with_timeout_fallback(prompt, timeout_seconds=5.0)
if result["status"] == "partial":
print("Serving degraded response")
4. Request Queueing and Backpressure
Instead of rejecting requests when overloaded, queue them:
import asyncio
from collections import deque
import time
class QueuedLLMService:
"""Process LLM requests with a queue to handle load spikes."""
def __init__(self, max_queue_size: int = 100, max_concurrent: int = 5):
self.queue = deque()
self.max_queue_size = max_queue_size
self.max_concurrent = max_concurrent
self.active_requests = 0
async def submit_request(self, prompt: str, priority: int = 0) -> str:
"""
Submit a request. If queue is full, reject immediately.
Requests are processed in priority order.
Args:
prompt: User prompt
priority: Higher = more urgent (0-10)
Returns:
Result or raises QueueFull
"""
if len(self.queue) >= self.max_queue_size:
raise QueueFull(f"Queue full ({len(self.queue)} items)")
# Create a future for the result
future = asyncio.Future()
self.queue.append((priority, prompt, future))
# Sort by priority
self.queue = deque(sorted(self.queue, key=lambda x: -x[0]))
# Start processing if we have capacity
if self.active_requests < self.max_concurrent:
asyncio.create_task(self._process_next())
# Wait for result
return await future
async def _process_next(self):
"""Process the next request from the queue."""
if not self.queue or self.active_requests >= self.max_concurrent:
return
self.active_requests += 1
try:
priority, prompt, future = self.queue.popleft()
result = await self._call_api(prompt)
future.set_result(result)
# Process next if queue not empty
if self.queue:
asyncio.create_task(self._process_next())
finally:
self.active_requests -= 1
async def _call_api(self, prompt: str) -> str:
"""Call the LLM API."""
await asyncio.sleep(1) # Simulate API latency
return f"Response to: {prompt}"
class QueueFull(Exception):
pass
# Usage
service = QueuedLLMService(max_queue_size=100, max_concurrent=5)
try:
result = await service.submit_request("What is AI?", priority=5)
except QueueFull as e:
print(f"Queue full: {e}")
# Return "service busy" to user
5. Feature Flags for Degradation
Control degradation behavior via feature flags:
class FeatureFlagClient:
"""LLM client with feature flags for graceful degradation."""
def __init__(self, api_key: str):
self.api_key = api_key
self.flags = {
"use_cache": True,
"allow_model_downgrade": True,
"max_tokens": 2000,
"timeout_seconds": 30
}
def set_flag(self, flag_name: str, value):
"""Dynamically adjust degradation behavior."""
self.flags[flag_name] = value
print(f"Flag {flag_name} set to {value}")
def call_llm(self, prompt: str) -> dict:
"""Call LLM respecting feature flags."""
if not self.flags["use_cache"]:
# Caching disabled; fetch fresh
return self._call_api_fresh(prompt)
# Try cache first
cached = self._get_cache(prompt)
if cached:
return cached
# Fetch fresh with timeout
try:
return self._call_api_fresh(prompt)
except TimeoutError:
# Timeout; return degraded response
if self.flags["allow_model_downgrade"]:
return {"status": "degraded", "message": "Service busy"}
raise
def _call_api_fresh(self, prompt: str) -> dict:
# Implementation details...
pass
def _get_cache(self, prompt: str) -> dict:
# Implementation details...
pass
# Usage
client = FeatureFlagClient(api_key="sk-...")
# Under load: disable expensive caching
client.set_flag("use_cache", False)
# Still more load: reduce tokens
client.set_flag("max_tokens", 500)
# Critical load: switch to fast model
client.set_flag("allow_model_downgrade", True)
Key Takeaways
- Graceful degradation keeps services available at reduced quality rather than failing completely.
- Cache responses to serve stale data on API failure.
- Downgrade to faster models under load instead of rejecting requests.
- Queue requests with backpressure to handle demand spikes.
- Use feature flags to dynamically adjust degradation behavior without redeploying.
Frequently Asked Questions
Should I always degrade, or only under load?
Degrade only when necessary (API slow, overloaded, or down). During normal operation, serve best-effort results. Monitor latency and error rates to detect when degradation should activate.
How stale is too stale for cached results?
Depends on your use case. For real-time information (stock prices, news), any cache is too stale. For stable information (documentation, FAQs), a week-old cache is fine. Set TTL appropriately: shorter for volatile data, longer for stable.
Can I queue indefinitely?
No. Set a maximum queue size and reject new requests when full. A queue that grows without bound becomes a memory leak and eventually crashes your service. Better to reject early and let the client retry later.
How do I know when to trigger degradation?
Monitor key metrics: API response latency, error rate, queue length. Trigger degradation when latency exceeds a threshold (e.g., >5 seconds) or error rate spikes (e.g., >5%). Use adaptive thresholds that adjust based on recent history.