Message Queues for LLM Workloads: Design Patterns
Message queues are the backbone of production LLM systems at scale. While in-memory queues (like queue.Queue in Python) work for a single machine, message queues like Redis, RabbitMQ, or AWS SQS enable decoupling of producers and consumers across a cluster of machines. A message queue accepts requests from any producer, persists them to durable storage, and distributes them to consumer workers across multiple servers. This pattern handles millions of concurrent requests, survives machine failures, and enables advanced features like priority queues, dead-letter handling, and idempotent retries.
Why Message Queues Beat In-Memory Queues
In-memory queues live in a single Python process. If the process crashes, all queued requests vanish. With 1000 request/sec throughput, that's potentially 1000–100,000 lost requests (depending on process uptime). For production LLM systems, this is unacceptable.
Message queues (Redis, RabbitMQ, AWS SQS) provide:
- Durability: Messages persist to disk; surviving process crashes.
- Scalability: Multiple producers and consumers on different machines.
- Fault tolerance: Built-in retries, dead-letter queues, and acknowledgments.
- Advanced routing: Priority queues, topic-based subscriptions, routing keys.
A typical comparison:
| Aspect | In-Memory Queue | Message Queue |
|---|---|---|
| Durability | Lost on crash | Persisted to disk |
| Machines | Single process | Cluster-wide (unlimited) |
| Throughput | 1–100k msg/s | 10k–1M msg/s |
| Failure recovery | Manual restart | Automatic retries |
| Ordering | FIFO per process | FIFO per partition/topic |
| Setup cost | ~0 min | 10–30 min |
Redis Queue (RQ): Simple Job Queue Pattern
Redis Queue (RQ) is the simplest message queue for Python. It uses Redis as the backend and integrates directly with your async code:
import redis
from rq import Queue
import asyncio
# Connect to Redis.
redis_conn = redis.Redis(host="localhost", port=6379)
queue = Queue(connection=redis_conn)
def process_llm_request(prompt: str) -> str:
"""
Worker function: process a single LLM request.
This runs on a dedicated worker machine.
"""
import aiohttp
# Run in a new event loop (workers are sync, not async).
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def fetch():
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}],
},
headers={"Authorization": f"Bearer {YOUR_API_KEY}"},
) as resp:
data = await resp.json()
return data["choices"][0]["message"]["content"]
return loop.run_until_complete(fetch())
# Producer: enqueue a job from a web request.
async def handle_user_request(user_prompt: str) -> str:
"""Enqueue a job and return immediately (fire-and-forget)."""
job = queue.enqueue(process_llm_request, user_prompt)
return f"Job {job.id} queued. Check status later."
# Consumer: worker process polls the queue and processes jobs.
def start_worker(num_workers: int = 1) -> None:
"""Start worker processes."""
from rq import Worker
worker = Worker([queue], connection=redis_conn)
worker.work()
This pattern decouples the producer (which returns immediately) from the consumer (which processes later). The producer doesn't wait for the LLM API response.
RabbitMQ for Advanced Routing
RabbitMQ is a heavyweight message broker with advanced features like topic exchanges, priority queues, and dead-letter routing. It's overkill for simple use cases but essential for complex routing:
import pika
import json
import asyncio
from aio_pika import connect_robust, IncomingMessage
async def rabbitmq_producer(prompt: str, priority: int = 5) -> None:
"""
Publish LLM request to RabbitMQ with priority.
Priority 10 = highest, 0 = lowest.
"""
connection = await connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Declare a queue with priority support.
exchange = await channel.declare_exchange(
"llm_requests",
"direct",
durable=True,
)
queue = await channel.declare_queue(
"llm_processing",
durable=True,
arguments={"x-max-priority": 10}, # Enable priority.
)
await queue.bind(exchange, "process")
# Publish with priority.
message_body = json.dumps({
"prompt": prompt,
"timestamp": asyncio.get_event_loop().time(),
})
await exchange.publish(
message_body=message_body.encode(),
routing_key="process",
priority=priority,
)
print(f"✓ Published prompt (priority {priority})")
async def rabbitmq_consumer() -> None:
"""
Consume LLM requests from RabbitMQ in priority order.
High-priority requests are processed first.
"""
connection = await connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.get_queue("llm_processing")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body.decode())
prompt = data["prompt"]
# Process the prompt.
response = await fetch_llm_response(prompt)
print(f"✓ Processed: {prompt[:50]}...")
RabbitMQ ensures high-priority requests are processed first, critical for user-facing LLM applications where some requests (e.g., customer support) matter more than others (e.g., batch analytics).
Dead-Letter Queue Pattern for Reliability
When a job fails repeatedly (e.g., LLM API is down), instead of retrying forever, send it to a dead-letter queue (DLQ) for manual inspection:
from rq import Queue
from rq.job import JobStatus
import redis
redis_conn = redis.Redis()
main_queue = Queue("llm_requests", connection=redis_conn)
dlq = Queue("llm_requests_dlq", connection=redis_conn) # Dead-letter queue
def process_with_dlq(prompt: str, max_retries: int = 3) -> str:
"""Process with automatic DLQ routing on repeated failure."""
job = main_queue.enqueue(
process_llm_request,
prompt,
retry=3,
on_failure=lambda job, connection, exc: dlq.enqueue(
lambda: {
"original_job_id": job.id,
"error": str(exc),
"prompt": prompt,
}
),
)
return job.id
def monitor_dlq() -> None:
"""Periodically inspect DLQ for failures."""
import time
while True:
job_count = len(dlq.job_ids)
if job_count > 0:
print(f"⚠️ {job_count} jobs in DLQ. Investigate:")
for job_id in dlq.job_ids[:5]: # Show first 5
job = dlq.fetch_job(job_id)
print(f" - {job.args}: {job.result}")
time.sleep(60)
This pattern prevents infinite retry loops and gives you visibility into systemic failures (e.g., API quota exhaustion).
Idempotent Job Processing
Message queues can deliver jobs multiple times (at-least-once semantics). To handle this, make your job functions idempotent:
def process_idempotent_llm_request(prompt: str, request_id: str) -> str:
"""
Idempotent worker: if the same request_id is processed twice,
return the cached result instead of calling the API again.
"""
import redis
# Check cache.
cache = redis.Redis()
cached = cache.get(f"llm_result:{request_id}")
if cached:
print(f"Cache hit for {request_id}")
return cached.decode()
# Call API.
response = synchronous_llm_call(prompt)
# Store in cache with TTL.
cache.setex(f"llm_result:{request_id}", 3600, response)
return response
With this pattern, even if RabbitMQ retries the same job 10 times, you only call the LLM API once.
Batching Jobs for Efficiency
Some LLM tasks benefit from batching (e.g., embeddings). Queue batch jobs and process them together:
from rq import Queue
import redis
redis_conn = redis.Redis()
batch_queue = Queue("llm_embeddings", connection=redis_conn)
async def enqueue_embedding_batch(texts: list, batch_size: int = 32) -> None:
"""Enqueue texts for batch embedding."""
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_queue.enqueue(embed_batch, batch)
def embed_batch(texts: list) -> dict:
"""
Process a batch of texts for embedding (50% cheaper than individual calls).
"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-minilm-l6-v2")
embeddings = model.encode(texts)
return {
"batch_size": len(texts),
"embeddings": embeddings.tolist(),
}
Batching reduces API calls by 90% and cuts costs proportionally.
Key Takeaways
- Message queues decouple producers and consumers: Enable scaling across multiple machines and surviving failures.
- Redis Queue is simple; RabbitMQ is advanced: Use RQ for straightforward FIFO; use RabbitMQ for priority queues and complex routing.
- Dead-letter queues prevent infinite retries: Failed jobs are routed to a DLQ for manual inspection.
- Idempotent processing handles at-least-once delivery: Cache results by request_id to avoid duplicate API calls.
- Batch jobs to reduce API calls: Process 32–100 items per job instead of one at a time; cuts costs 50–90%.
Frequently Asked Questions
Should I use RQ or RabbitMQ?
RQ: you're prototyping or have simple FIFO requirements. RabbitMQ: you need priority queues, topic routing, or complex failure handling. For most LLM systems, RQ is sufficient; RabbitMQ is overkill unless you have heterogeneous job types.
How do I scale workers horizontally?
Start multiple worker processes on different machines, all connected to the same Redis/RabbitMQ broker. Each worker polls the queue independently. No coordination needed.
What happens if a consumer crashes mid-job?
With acknowledgment-based queues (RabbitMQ, Redis Streams), the job is re-queued automatically. Make your jobs idempotent so processing twice is safe.
Can I prioritize certain requests?
Yes, RabbitMQ supports priority queues natively. With RQ, use separate queues (e.g., high-priority and normal) and have workers pull from high-priority first.
What's the latency impact of queueing?
Typically 10–100 ms to enqueue and dequeue (Redis/RabbitMQ latency). For real-time applications, keep queue depth low (< 100 messages) by ensuring workers scale with load.
Further Reading
- RQ (Redis Queue) Documentation — simple Python job queue.
- RabbitMQ Tutorials — comprehensive message broker guide.
- AWS SQS vs SNS vs EventBridge — AWS managed queues for large scale.
- Reliable Job Processing — philosophy of robust job queues.