Concurrent LLM Requests: Build Your First Parallel Pipeline
Building a concurrent LLM request pipeline is the next step beyond understanding async/await—it's where you actually ship fast, scalable systems. A naive synchronous pipeline processes one LLM request at a time, hitting 200–2000 ms latency per request. A concurrent pipeline batches requests using asyncio.gather(), asyncio.as_completed(), or task queues, reducing total latency by 50–100× and enabling throughput of 100–1,000 requests per second on a single machine. This article walks you through building a production-grade concurrent pipeline that handles timeouts, retries, and error aggregation.
Designing a Concurrent Pipeline for LLM Requests
A concurrent pipeline has three stages: input collection, concurrent execution, and result aggregation. You collect all requests, spawn concurrent tasks, and gather results. The key is choosing the right concurrency primitive and batching strategy.
| Pipeline Stage | Synchronous | Concurrent | Gain |
|---|---|---|---|
| Input (collect 100 requests) | 1 ms | 1 ms | — |
| Execution (100 × 200 ms API latency) | 20,000 ms | 200 ms | 100x |
| Aggregation (parse results) | 10 ms | 10 ms | — |
| Total for 100 requests | 20 seconds | 0.2 seconds | 100x faster |
A typical LLM API call has ~200 ms baseline latency (network round-trip). With 100 sequential requests, you wait 20 seconds. With concurrent requests, all 100 run "in parallel," and the total time is just the latency of the slowest request (~200 ms). This 100x speedup is the entire value proposition of concurrency.
Building a Basic Concurrent Pipeline with asyncio.gather
Here's a production-ready pipeline that fetches LLM responses concurrently:
import asyncio
import aiohttp
from typing import List
async def fetch_llm_response(
session: aiohttp.ClientSession,
prompt: str,
model: str = "gpt-4",
timeout: int = 30,
) -> dict:
"""
Fetch a single LLM response concurrently.
Returns: {"prompt": str, "response": str, "latency_ms": float}
Raises: TimeoutError if request exceeds timeout_seconds.
"""
import time
start = time.time()
try:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
},
headers={"Authorization": f"Bearer {YOUR_API_KEY}"},
timeout=aiohttp.ClientTimeout(total=timeout),
) as resp:
if resp.status != 200:
raise RuntimeError(f"API returned {resp.status}: {await resp.text()}")
data = await resp.json()
latency_ms = (time.time() - start) * 1000
return {
"prompt": prompt,
"response": data["choices"][0]["message"]["content"],
"latency_ms": latency_ms,
}
except asyncio.TimeoutError:
latency_ms = (time.time() - start) * 1000
return {
"prompt": prompt,
"response": None,
"error": "Timeout",
"latency_ms": latency_ms,
}
async def concurrent_llm_pipeline(prompts: List[str], max_concurrent: int = 10) -> dict:
"""
Execute LLM requests concurrently with concurrency limit.
Args:
prompts: List of user prompts to send to the LLM.
max_concurrent: Max number of simultaneous API calls (respect rate limits).
Returns:
{"results": [...], "total_latency_ms": float, "succeeded": int, "failed": int}
"""
import time
start = time.time()
# Create a semaphore to limit concurrent requests to avoid rate limiting.
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_fetch(session, prompt):
"""Fetch while respecting the concurrency limit."""
async with semaphore:
return await fetch_llm_response(session, prompt)
async with aiohttp.ClientSession() as session:
# Create tasks for all prompts.
tasks = [bounded_fetch(session, prompt) for prompt in prompts]
# Wait for all tasks to complete (gather returns results in order).
results = await asyncio.gather(*tasks, return_exceptions=False)
# Aggregate results.
total_latency_ms = (time.time() - start) * 1000
succeeded = sum(1 for r in results if r.get("response") is not None)
failed = len(results) - succeeded
return {
"results": results,
"total_latency_ms": total_latency_ms,
"succeeded": succeeded,
"failed": failed,
"average_latency_ms": sum(r.get("latency_ms", 0) for r in results) / len(results),
}
# Example usage
if __name__ == "__main__":
prompts = [
"What is the capital of France?",
"Explain quantum computing in one sentence.",
"Write a haiku about concurrency.",
] * 10 # 30 prompts total
stats = asyncio.run(concurrent_llm_pipeline(prompts, max_concurrent=10))
print(f"✓ Succeeded: {stats['succeeded']}")
print(f"✗ Failed: {stats['failed']}")
print(f"Total time: {stats['total_latency_ms']:.0f} ms")
print(f"Average latency per request: {stats['average_latency_ms']:.0f} ms")
print(f"Throughput: {len(prompts) / (stats['total_latency_ms'] / 1000):.1f} req/s")
The key insight is the semaphore (asyncio.Semaphore). It limits concurrent requests to avoid overwhelming the LLM API (which has rate limits) or exhausting local resources. With max_concurrent=10, only 10 requests run simultaneously; others wait in the queue.
Streaming Results with asyncio.as_completed
For large batches (hundreds or thousands of requests), waiting for all results (gather()) can waste time. Instead, process results as they arrive using as_completed():
async def streaming_pipeline(prompts: List[str]) -> None:
"""Process results as they complete, rather than waiting for all."""
semaphore = asyncio.Semaphore(20)
async def bounded_fetch(session, prompt):
async with semaphore:
return await fetch_llm_response(session, prompt)
async with aiohttp.ClientSession() as session:
tasks = {
asyncio.create_task(bounded_fetch(session, prompt)): prompt
for prompt in prompts
}
# as_completed() yields tasks as they finish, not in order.
for task in asyncio.as_completed(tasks):
result = await task
prompt = tasks[task]
# Process immediately (e.g., save to database, log, etc.)
print(f"✓ {prompt[:50]}: {result['response'][:100]}...")
# With large batches, this prints output in real time instead of waiting
# for the entire batch to complete (can reduce perceived latency by 50%).
This pattern is critical for user-facing applications. Instead of the user waiting 5 seconds for all 100 results, they see results streaming in every 100–200 ms, improving perceived performance dramatically.
Retry Logic for Fault Tolerance
LLM APIs occasionally fail (rate limits, transient network errors, service hiccups). A robust pipeline implements exponential backoff retries:
import random
async def fetch_with_retries(
session: aiohttp.ClientSession,
prompt: str,
max_retries: int = 3,
base_delay: float = 0.5,
) -> dict:
"""Fetch with exponential backoff retries."""
for attempt in range(max_retries):
try:
return await fetch_llm_response(session, prompt)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
return {"prompt": prompt, "response": None, "error": str(e)}
# Exponential backoff: 0.5s, 1s, 2s, ...
delay = base_delay * (2 ** attempt)
# Add jitter to avoid thundering herd.
jitter = random.uniform(0, 0.1 * delay)
await asyncio.sleep(delay + jitter)
Retries increase reliability from ~99% to ~99.9% for typical cloud APIs, at the cost of slightly longer latency on failures. For production systems, this tradeoff is essential.
Batch Processing for Cost Efficiency
Some LLM providers offer bulk endpoints (e.g., OpenAI Batch API) that are 50% cheaper but have 24-hour turnaround. A hybrid pipeline groups requests by urgency:
async def hybrid_pipeline(prompts: List[str]) -> dict:
"""
Split prompts: urgent ones run via real-time API,
bulk ones queue for batch processing.
"""
urgent = [p for p in prompts if len(p) < 100] # Short, fast questions
bulk = [p for p in prompts if len(p) >= 100] # Long, can batch
# Run urgent requests concurrently now.
urgent_results = await concurrent_llm_pipeline(urgent, max_concurrent=10)
# Queue bulk requests for batch API (24-hour turnaround, 50% cheaper).
batch_job_id = await queue_batch_job(bulk)
return {
"urgent": urgent_results,
"batch_job_id": batch_job_id,
"cost_savings": len(bulk) * 0.01, # ~1 cent per bulk request
}
Key Takeaways
- Concurrent pipelines achieve 50–100x throughput gains by overlapping API calls instead of waiting sequentially.
- Semaphores enforce rate limits: Use
asyncio.Semaphore()to cap concurrent requests and respect API quotas. - Use
as_completed()for large batches: Stream results as they arrive instead of waiting for all tasks. - Retries with exponential backoff increase reliability: 99.9% success vs. 99% with naive retries.
- Batch APIs trade latency for cost: 24-hour turnaround at 50% discount is worthwhile for non-urgent work.
Frequently Asked Questions
How do I choose max_concurrent?
Start with 10–20 and measure. If you hit rate-limit errors (429 responses), reduce it. If latency is poor, increase it. Monitor your API dashboard; most providers show current concurrency usage.
What's the difference between gather() and as_completed()?
gather() waits for all tasks and returns results in original order—simple, but slow for large batches. as_completed() yields tasks as they finish, allowing you to process results immediately—better for streaming or real-time feedback.
Can I use async for local LLM inference?
Async doesn't help CPU-bound work like local inference. Use loop.run_in_executor() to run inference in a thread pool, freeing the event loop for other I/O.
How do I handle partial failures (some requests fail)?
Return results with error fields (as in the examples above), then filter: failed = [r for r in results if r.get("error")]. Retry failed requests later or raise an exception if the failure rate exceeds a threshold.
What's the maximum throughput per machine?
Depends on network bandwidth and your LLM provider's limits, but typically 100–1,000 req/s per machine. For higher throughput, use worker pools or horizontal scaling (see later articles).
Further Reading
- asyncio.gather() and as_completed() Docs — official guide to task coordination.
- aiohttp Session Management — best practices for connection pooling and reuse.
- Exponential Backoff and Jitter — AWS guide to reliable retries.
- The Twelve-Factor App: Concurrency — architectural patterns for scalable applications.