Backpressure and Flow Control in LLM Systems
Backpressure is the critical mechanism that prevents LLM systems from cascading into failure under load. Without backpressure, requests accumulate unbounded in queues, memory exhaustion follows, and the entire system crumbles. With backpressure, your system gracefully degrades—rejecting new work when overloaded, giving fast failure signals to clients, and protecting infrastructure. Backpressure is the reason Netflix, Uber, and Discord scale to billions of requests per day without melting down.
What Is Backpressure and Why It Matters
Backpressure is simple: when a system is at capacity, it stops accepting new work and signals back to the producer, "I'm full. Stop sending." Without it:
- Producers keep enqueuing requests (queue grows unbounded).
- Memory fills up (often within minutes on a busy service).
- System crashes or becomes unresponsive.
- Clients hang indefinitely waiting for responses.
With backpressure:
- Queue reaches a limit (e.g., 1000 pending jobs).
- System rejects incoming requests with a 503 (Service Unavailable) or queues them with a timeout.
- Clients get fast failure signals and can retry or choose an alternate provider.
- System remains stable and recovers gracefully when load drops.
Contrast these scenarios:
| Without Backpressure | With Backpressure |
|---|---|
| Request queue grows to 10,000 jobs | Queue caps at 1,000 jobs |
| Memory usage: 10 GB → OOM crash | Memory: stable 2 GB |
| Response time: 60 seconds (or timeout) | Response time: 2 seconds or fast 503 rejection |
| Recovery: manual restart needed | Recovery: automatic as queue drains |
Bounded Queues and Rejection Policies
The simplest backpressure is a bounded queue with a rejection policy:
import asyncio
import queue
from typing import Optional
class BoundedLLMQueue:
"""
FIFO queue with backpressure: reject new work if full.
"""
def __init__(self, max_size: int = 1000):
self.queue = asyncio.Queue(maxsize=max_size)
self.rejected_count = 0
async def enqueue(self, request: dict, timeout_sec: float = 0.1) -> bool:
"""
Try to enqueue; return False if rejected (queue full).
This gives the producer immediate feedback instead of hanging.
"""
try:
await asyncio.wait_for(
self.queue.put(request),
timeout=timeout_sec
)
return True
except asyncio.TimeoutError:
self.rejected_count += 1
return False
async def dequeue(self) -> dict:
"""Pull next request for processing."""
return await self.queue.get()
def size(self) -> int:
"""Current queue depth."""
return self.queue.qsize()
def load_factor(self) -> float:
"""Current utilization (0.0 to 1.0)."""
return self.queue.qsize() / self.queue._maxsize
# Web handler with backpressure.
async def handle_llm_request(request_data: dict) -> dict:
"""HTTP handler that respects backpressure."""
if not await llm_queue.enqueue(request_data):
# Queue is full; return 503 Service Unavailable.
return {
"status": 503,
"error": "Service temporarily overloaded. Retry in 5 seconds.",
"queue_depth": llm_queue.size(),
}
return {
"status": 202, # Accepted, processing async.
"message": "Your request has been queued.",
}
This pattern gives clients instant feedback: either "accepted" (202) or "service overloaded" (503). Clients know to back off and retry instead of hanging.
Adaptive Load Shedding with Latency-Based Rejection
For advanced backpressure, reject requests when latency gets too high, not just when the queue is full:
import time
class AdaptiveLoadShedder:
"""
Reject requests if processing latency exceeds a threshold.
Protects against cascading slowdowns.
"""
def __init__(self, max_latency_ms: int = 5000):
self.max_latency_ms = max_latency_ms
self.request_times = [] # Recent request completion times
def should_accept_request(self) -> bool:
"""
Accept only if recent requests complete within target latency.
"""
if not self.request_times:
return True
# Calculate P99 latency (99th percentile) from last 100 requests.
recent = sorted(self.request_times[-100:])
p99_latency = recent[int(0.99 * len(recent))]
if p99_latency > self.max_latency_ms:
print(f"⚠️ P99 latency {p99_latency} ms exceeds {self.max_latency_ms} ms. Shedding load.")
return False
return True
def record_request(self, latency_ms: float) -> None:
"""Record latency for future admission decisions."""
self.request_times.append(latency_ms)
# Keep only last 1000 requests to avoid memory bloat.
if len(self.request_times) > 1000:
self.request_times.pop(0)
shedder = AdaptiveLoadShedder(max_latency_ms=1000)
async def process_with_shedding(request: dict) -> dict:
"""Process only if latency is acceptable."""
if not shedder.should_accept_request():
return {"status": 503, "error": "Service busy. Please retry."}
start = time.time()
result = await fetch_llm_response(request["prompt"])
latency_ms = (time.time() - start) * 1000
shedder.record_request(latency_ms)
return {"status": 200, "result": result, "latency_ms": latency_ms}
With latency-based shedding, your system automatically throttles under load. When P99 latency hits 1 second, new requests are rejected, allowing the queue to drain. Once latency drops below 1 second, new requests are accepted again. This prevents the queue from ever building up and causing cascading failures.
Token Bucket for Rate-Based Backpressure
Token bucket is a classic algorithm for enforcing a rate limit: new requests consume tokens from a bucket that refills at a fixed rate. When empty, requests are rejected:
import asyncio
import time
class TokenBucket:
"""
Token bucket for rate limiting.
Refills at `refill_rate` tokens per second.
"""
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
def consume(self, tokens: int = 1) -> bool:
"""Try to consume tokens. Return True if granted, False if denied."""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self) -> None:
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
# Example: allow 100 requests per second.
bucket = TokenBucket(capacity=100, refill_rate=100)
async def request_with_rate_limit(request: dict) -> dict:
"""Accept requests up to 100 per second; reject beyond that."""
if not bucket.consume(1):
return {
"status": 429,
"error": "Rate limit exceeded. Max 100 req/sec.",
}
return await process_llm_request(request)
Token bucket is ideal when you want to enforce a hard throughput ceiling (e.g., "never exceed 100 req/sec"). It's simpler than latency-based shedding but less adaptive.
Reactive Streams and Back-Pressure Signaling
In complex systems with multiple stages (request → validation → LLM call → post-processing → response), each stage can signal backpressure upstream. Libraries like RxPython or asyncio primitives handle this:
import asyncio
class PipelineStage:
"""
A stage in a multi-stage pipeline with backpressure.
Each stage has an input queue and output queue.
"""
def __init__(self, name: str, max_queue_size: int = 100):
self.name = name
self.input_queue = asyncio.Queue(maxsize=max_queue_size)
self.output_queue = asyncio.Queue()
async def run(self, process_fn) -> None:
"""
Continuously process items from input queue.
If input queue is full, upstream gets backpressure.
"""
while True:
item = await self.input_queue.get()
try:
result = await process_fn(item)
await self.output_queue.put(result)
except Exception as e:
print(f"✗ {self.name} error: {e}")
finally:
self.input_queue.task_done()
# Build a pipeline: validation → LLM call → post-processing
validation = PipelineStage("validation", max_queue_size=100)
llm_stage = PipelineStage("llm_call", max_queue_size=50)
postprocess = PipelineStage("postprocess", max_queue_size=100)
async def main():
# Start stages.
asyncio.create_task(validation.run(validate_request))
asyncio.create_task(
llm_stage.run(lambda req: fetch_llm_response(req["prompt"]))
)
asyncio.create_task(postprocess.run(lambda resp: format_response(resp)))
# Enqueue work.
for i in range(1000):
# If validation queue is full (100 items), this blocks.
# Backpressure: upstream producers know to slow down.
await validation.input_queue.put({"id": i, "prompt": f"Q{i}"})
async def validate_request(req: dict) -> dict:
"""Validation stage."""
await asyncio.sleep(0.01) # Simulate work.
return req
async def format_response(resp: str) -> str:
"""Post-processing stage."""
return resp.upper()
asyncio.run(main())
Each stage can set its own queue size based on capacity. If the LLM call stage is slow (latency 500 ms), use a smaller queue (50 items) so requests don't pile up. The validation stage's queue fills up, signaling the producer (upstream HTTP handler) to reject new requests.
Monitoring and Alerting for Queue Saturation
In production, monitor queue depth and alert when saturation is approaching:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def monitor_queue_health(queue: asyncio.Queue, threshold: float = 0.8):
"""
Monitor queue depth; alert if near capacity.
"""
while True:
depth = queue.qsize()
capacity = queue._maxsize
utilization = depth / capacity if capacity else 0
if utilization > threshold:
logger.warning(
f"Queue saturation alert: {depth}/{capacity} items (threshold: {threshold:.0%})"
)
await asyncio.sleep(5)
Set alerts at 80–90% capacity to give ops teams time to scale before hitting 100%.
Key Takeaways
- Backpressure prevents cascading failure: Reject or queue with timeout instead of accepting unbounded requests.
- Bounded queues are the foundation: Set
max_sizeto a reasonable limit (1000–10000 based on memory budget). - Latency-based rejection adapts automatically: Reject requests when P99 latency exceeds a threshold.
- Token bucket enforces rate limits: Simple, predictable; suitable for fixed-throughput constraints.
- Monitor queue depth constantly: Alert at 80%+ utilization; scale workers before hitting capacity.
Frequently Asked Questions
What's a good queue max_size?
Start with 1000. Multiply by average request size in KB: if requests are ~1 KB, 1000 requests = ~1 MB. If requests are 10 KB, use 100 max_size to keep memory under 1 MB. Adjust based on memory budget.
Should I use reject (503) or queue indefinitely?
Reject. Clients have timeout budgets; if a request waits 30 seconds in a queue, it fails on the client side anyway. Better to fail fast (503) so clients can retry elsewhere or backoff immediately.
How do I measure if backpressure is working?
Track rejection_count and queue_max_size over time. Rejections should be < 1% at normal load. If rejections spike, it indicates either traffic surge or a performance regression upstream.
Can I queue low-priority requests even when high-priority ones are rejected?
Yes, use separate queues per priority. Process high-priority queue first; low-priority only when high-priority is empty. This is the multi-queue backpressure pattern.
What if clients ignore 503 and retry immediately?
Implement exponential backoff on the client side (standard practice). Add a Retry-After header to guide clients: Retry-After: 5 (seconds).
Further Reading
- Reactive Streams Specification — standardized backpressure for JVM languages.
- The Tail at Scale — latency-based load shedding research.
- Netflix Hystrix: Bulkheads — compartmentalization to prevent cascade failure.
- Kafka Backpressure — backpressure in streaming systems.