Building High-Throughput LLM Inference Pipelines
High-throughput LLM inference pipelines are the final frontier of scaling. While earlier articles focused on concurrency and distribution, this article tackles the hard physics: making each request faster, cheaper, and using less network bandwidth. Techniques like request batching (3–5x throughput gain), semantic caching (70–90% cache hits on production data), dynamic quantization (2–4x memory reduction), and response streaming (perceived latency cut by 50%) compound to enable 10,000+ requests per second on modest hardware. These are the optimization secrets behind ChatGPT, Claude, and other production LLM services.
Request Batching for Throughput Maximization
Batching multiple requests together reduces API calls and network overhead. Instead of 100 individual API calls, process 4 batches of 25 requests. This increases throughput by 3–5x while reducing latency per request.
import asyncio
from typing import List
import time
class BatchProcessor:
"""
Accumulate requests and process in batches.
Balance: larger batches = more throughput, longer wait time.
"""
def __init__(self, batch_size: int = 32, max_wait_ms: int = 100):
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.pending_requests = []
self.pending_futures = []
self.last_batch_time = time.time()
async def enqueue(self, request: dict) -> str:
"""
Enqueue request; return ID for later retrieval.
"""
request_id = len(self.pending_requests)
self.pending_requests.append(request)
future = asyncio.Future()
self.pending_futures.append(future)
# Check if batch is ready.
should_process = (
len(self.pending_requests) >= self.batch_size or
(time.time() - self.last_batch_time) * 1000 > self.max_wait_ms
)
if should_process:
await self._process_batch()
return request_id
async def _process_batch(self) -> None:
"""Process accumulated batch."""
if not self.pending_requests:
return
batch = self.pending_requests[:]
futures = self.pending_futures[:]
self.pending_requests = []
self.pending_futures = []
self.last_batch_time = time.time()
try:
# Call LLM API with batch.
results = await self._llm_batch_call(batch)
# Distribute results to futures.
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def _llm_batch_call(self, batch: List[dict]) -> List[dict]:
"""
Call LLM API with batch.
Some APIs (e.g., OpenAI, Anthropic) support batch endpoints.
"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4",
"messages": [
[{"role": "user", "content": req["prompt"]} for req in batch]
],
},
headers={"Authorization": f"Bearer {YOUR_API_KEY}"},
) as resp:
data = await resp.json()
return [
{
"prompt": batch[i]["prompt"],
"response": data["choices"][i]["message"]["content"],
}
for i in range(len(batch))
]
# Example: process requests as they arrive, batch when full or timeout.
processor = BatchProcessor(batch_size=32, max_wait_ms=100)
async def handle_request(prompt: str) -> str:
request_id = await processor.enqueue({"prompt": prompt})
result = await processor.pending_futures[request_id]
return result["response"]
Batching trades latency for throughput: requests wait up to 100 ms to batch together, but throughput increases 3–5x. For non-real-time applications, this is a great tradeoff.
Semantic Caching to Avoid Redundant API Calls
70–90% of production LLM requests are similar (duplicate questions, common patterns). Semantic caching stores embeddings of prompts and reuses responses for similar queries, avoiding API calls entirely:
import redis
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticCache:
"""
Cache LLM responses by semantic similarity.
Avoid API calls for similar prompts.
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
self.similarity_threshold = 0.95 # 95% similarity = cache hit
async def get_or_generate(
self,
prompt: str,
generate_fn, # Async function to call LLM
) -> dict:
"""
Check cache; return cached response if similar exists.
Otherwise call generate_fn and cache result.
"""
# Encode prompt to embedding.
embedding = self.encoder.encode(prompt).tolist()
# Search Redis for similar embeddings (using vector index).
# (Requires Redis Stack with vector support, or external vector DB.)
cached_key = self._find_similar_prompt(embedding)
if cached_key:
# Cache hit! Return stored response.
cached = self.redis.hgetall(cached_key)
return {
"response": cached["response"].decode(),
"cached": True,
"latency_ms": 10, # Redis lookup time
}
# Cache miss: generate new response.
result = await generate_fn(prompt)
# Store in cache.
cache_key = f"llm_cache:{hash(prompt)}"
self.redis.hset(
cache_key,
mapping={
"prompt": prompt,
"response": result["response"],
"embedding": str(embedding),
"timestamp": time.time(),
},
)
self.redis.expire(cache_key, 86400) # TTL: 24 hours
return {
"response": result["response"],
"cached": False,
"latency_ms": result["latency_ms"],
}
def _find_similar_prompt(self, embedding: list) -> Optional[str]:
"""
Find Redis key with similar embedding.
(Simplified: exact match in production, use vector DB).
"""
# Scan Redis for embeddings with cosine similarity > threshold.
# This is a slow approach; use Redis Stack Vector Search for production.
return None
cache = SemanticCache()
async def fetch_with_cache(prompt: str) -> dict:
"""Fetch with semantic caching."""
return await cache.get_or_generate(
prompt,
generate_fn=lambda p: fetch_llm_response(p),
)
In production, use dedicated vector databases (Pinecone, Weaviate, Redis Stack) for efficient similarity search. Semantic caching can reduce API calls by 70–90%, slashing costs proportionally.
Model Quantization for Edge Inference
Quantization reduces model size and latency by running inference on integers instead of floats. A quantized model is 4–10x smaller and 2–3x faster:
# Quantize and run locally (using transformers + bitsandbytes).
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# Load model in 8-bit quantization (reduce from 13 GB to 6.5 GB).
model = AutoModelForCausalLM.from_pretrained(
"mistralai/Mistral-7B",
load_in_8bit=True, # Quantize to 8-bit
device_map="auto", # Distribute across available GPUs/CPU
)
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B")
async def quantized_inference(prompt: str) -> str:
"""
Run inference on quantized model (2–3x faster, uses 50% memory).
Useful for high-volume, moderate-quality inference.
"""
inputs = tokenizer(prompt, return_tensors="pt")
with torch.no_grad(): # Disable gradients for inference.
outputs = model.generate(
**inputs,
max_new_tokens=256,
temperature=0.7,
)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
Quantization is ideal for high-throughput, cost-sensitive scenarios. Trade model quality slightly for 2–3x throughput gains.
Streaming Responses to Reduce Perceived Latency
Instead of waiting for the entire response, stream tokens as they arrive. Perceived latency drops by 50–70%:
import asyncio
from typing import AsyncIterator
async def stream_llm_response(prompt: str) -> AsyncIterator[str]:
"""
Stream LLM response token-by-token.
Reduces perceived latency: user sees first token in 200 ms instead of 2 s.
"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}],
"stream": True, # Enable streaming
},
headers={"Authorization": f"Bearer {YOUR_API_KEY}"},
) as resp:
async for line in resp.content:
if line.startswith(b"data:"):
data_str = line.decode().replace("b'data: ", "").strip()
if data_str == "[DONE]":
break
import json
data = json.loads(data_str)
token = data["choices"][0]["delta"].get("content", "")
if token:
yield token
# Web handler that streams response.
async def handle_stream(prompt: str) -> AsyncIterator[str]:
"""Return streaming response to client."""
async for token in stream_llm_response(prompt):
yield token + "\n" # Send token to client
await asyncio.sleep(0.01) # Brief delay for batching
On the frontend, render tokens as they arrive:
const response = await fetch('/api/stream', { method: 'POST', body: prompt });
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const token = new TextDecoder().decode(value);
document.body.innerText += token; // Append token to page
}
Streaming is critical for user-facing LLM applications. It eliminates the "waiting for response" UX problem.
Pipeline Optimization: Parallel Stages
Process requests through multiple pipeline stages concurrently:
import asyncio
from asyncio import Queue
class InferencePipeline:
"""
Multi-stage inference pipeline:
1. Embedding (tokenize prompt)
2. Model inference
3. Post-processing (cleanup)
"""
def __init__(self, num_embed_workers: int = 2, num_infer_workers: int = 4):
self.embed_queue = Queue()
self.infer_queue = Queue()
self.output_queue = Queue()
async def start(self) -> None:
"""Start worker tasks."""
# Embedding workers
for _ in range(2):
asyncio.create_task(self._embedding_worker())
# Inference workers
for _ in range(4):
asyncio.create_task(self._inference_worker())
async def _embedding_worker(self) -> None:
"""Stage 1: tokenize prompts."""
while True:
request_id, prompt = await self.embed_queue.get()
# Tokenize
tokens = tokenize(prompt)
# Forward to inference queue
await self.infer_queue.put((request_id, tokens))
self.embed_queue.task_done()
async def _inference_worker(self) -> None:
"""Stage 2: run model inference."""
while True:
request_id, tokens = await self.infer_queue.get()
# Inference (batched with others in queue)
result = await quantized_inference(tokens)
# Forward to output
await self.output_queue.put((request_id, result))
self.infer_queue.task_done()
async def process(self, prompts: List[str]) -> List[str]:
"""Process batch of prompts through pipeline."""
# Enqueue all prompts to embedding stage.
for i, prompt in enumerate(prompts):
await self.embed_queue.put((i, prompt))
# Wait for all to complete.
results = [None] * len(prompts)
for _ in prompts:
request_id, result = await self.output_queue.get()
results[request_id] = result
return results
Multi-stage pipelines achieve better CPU utilization and throughput than single-stage processing.
Key Takeaways
- Request batching increases throughput 3–5x: Trade latency (up to 100 ms wait) for parallelism.
- Semantic caching eliminates 70–90% of API calls: Store embeddings; reuse responses for similar queries.
- Quantization reduces model size and latency 2–4x: Run locally for high-throughput, moderate-quality inference.
- Streaming responses reduce perceived latency by 50%: User sees first token in 200 ms, improving UX.
- Multi-stage pipelines parallelize work: Different stages run on different workers, improving CPU utilization.
Frequently Asked Questions
What batch size should I use?
Start with 32 and measure throughput. Larger batches = higher throughput, higher latency. For real-time applications, cap at 16–32. For batch processing, use 64–256.
How much latency does batching add?
Set max_wait_ms to your latency budget. If your SLA is 1 second, set max_wait_ms=100 (batch every 100 ms for ~100 ms added latency). Requests arriving in the same batch complete at the same time.
Can I use semantic caching with real-time responses?
Yes, but with caveats: semantic similarity threshold affects cache hits. Set to 0.95+ for very similar prompts only. For real-time applications where freshness matters, use shorter TTLs (1–6 hours).
How much memory does quantization save?
8-bit quantization: 50% reduction (13 GB model → 6.5 GB). 4-bit: 75% reduction (13 GB → 3.25 GB). Trade-off: some quality loss (typically imperceptible for most tasks).
Should I stream all responses?
Stream for user-facing applications (web, mobile) to improve perceived latency. Don't stream for backend systems or batch processing—latency is less critical, and streaming adds overhead.
Further Reading
- OpenAI Batch Processing API — official batching for 50% cost discount.
- Semantic Caching with Vector Databases — Pinecone vector DB for similarity search.
- BitsandBytes 8-Bit Quantization — quantization library.
- Server-Sent Events (SSE) for Streaming — HTTP streaming standard.