Worker Pools for LLM Tasks: Distribute and Scale
Worker pools are the bridge between single-threaded async pipelines and multi-machine distributed systems. A worker pool spawns 2–N independent workers (threads or processes), each with its own event loop, that consume tasks from a shared queue. This pattern scales LLM workloads across multiple CPU cores, handles CPU-intensive post-processing (tokenization, embedding, parsing), and distributes load fairly. A single machine with 8 CPU cores and a well-designed worker pool can process 10–50 times more LLM requests than a single async event loop.
Why Worker Pools Matter for LLM Systems
Single-threaded async handles I/O-bound work (waiting for network) brilliantly, but it cannot use multiple CPU cores. On modern hardware with 4–128 cores, leaving 95% idle is wasteful. Worker pools exploit parallelism by spawning one event loop per core, allowing multiple LLM API calls to truly run in parallel—not just interleaved on a single thread.
Additionally, some LLM tasks are CPU-bound and cannot run on the async event loop without blocking:
- Tokenization: Breaking prompts into tokens (1–10 ms per request).
- Embedding generation: Encoding text locally (100–1000 ms for large texts).
- Response parsing: Extracting JSON, structured data from LLM outputs (10–100 ms).
- Post-processing: Filtering, ranking, or deduplicating results.
These tasks require dedicated worker threads/processes to avoid starving the event loop.
Architecture: Producer-Consumer Pattern
A typical worker pool architecture has:
- Producer: Main async event loop that collects requests and enqueues them.
- Queue: Thread-safe (thread.Queue or multiprocessing.Queue) that holds pending tasks.
- Workers: 2–N independent threads/processes that dequeue tasks, process them, and enqueue results.
- Result queue: Collects outputs for the producer to retrieve.
┌─────────────┐
│ Input │
│ Requests │
└──────┬──────┘
│
▼
┌──────────────────┐
│ Task Queue │ (thread-safe, bounded)
│ [req1, req2...]│
└──────┬───────────┘
│
┌───┴────────────────────┐
│ │
▼ ▼
┌────────────┐ ┌────────────┐
│ Worker 1 │ │ Worker 2 │
│ (Thread/ │ │ (Thread/ │
│ Process) │ ... │ Process) │
└────┬───────┘ └────┬───────┘
│ │
└───────────┬───────────┘
│
▼
┌─────────────┐
│Result Queue │
└─────────────┘
Thread Pool for I/O-Bound LLM Post-Processing
For I/O-bound tasks (database writes, cache updates), use concurrent.futures.ThreadPoolExecutor, which is lightweight and integrates with async:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import json
import sqlite3
from typing import List
# CPU-bound task: parse and validate LLM response.
def parse_llm_response(response_text: str) -> dict:
"""
Extract JSON from markdown code block.
Example: "Here's the result:\n```json\n{...}\n```"
"""
import re
match = re.search(r'```json\n(.*?)\n```', response_text, re.DOTALL)
if match:
return json.loads(match.group(1))
raise ValueError("No JSON block found in response.")
def save_to_database(user_id: str, result: dict) -> None:
"""Save result to SQLite (I/O-bound)."""
conn = sqlite3.connect("llm_results.db")
conn.execute(
"INSERT INTO results (user_id, data) VALUES (?, ?)",
(user_id, json.dumps(result)),
)
conn.commit()
conn.close()
async def process_llm_batch(
responses: List[str],
user_ids: List[str],
executor: ThreadPoolExecutor,
) -> dict:
"""
Process LLM responses concurrently using a thread pool.
Offloads parsing and database writes to workers.
"""
loop = asyncio.get_event_loop()
# Offload CPU-bound parsing to thread pool.
parse_tasks = [
loop.run_in_executor(executor, parse_llm_response, response)
for response in responses
]
parsed_results = await asyncio.gather(*parse_tasks)
# Offload I/O-bound database writes to thread pool.
save_tasks = [
loop.run_in_executor(executor, save_to_database, user_id, result)
for user_id, result in zip(user_ids, parsed_results)
]
await asyncio.gather(*save_tasks)
return {"processed": len(responses), "stored": len(parsed_results)}
# Example usage
async def main():
with ThreadPoolExecutor(max_workers=4) as executor:
responses = [
'{"answer": "Paris"}',
'{"answer": "Berlin"}',
]
user_ids = ["user1", "user2"]
result = await process_llm_batch(responses, user_ids, executor)
print(f"Processed {result['processed']} responses")
asyncio.run(main())
In this example, parsing and database writes happen on 4 worker threads, leaving the main event loop free to accept new requests. Without workers, database writes (which can take 10–100 ms) would block all other tasks.
Process Pool for CPU-Intensive Tasks
For truly CPU-bound work (local embeddings, heavy parsing), use concurrent.futures.ProcessPoolExecutor to spawn separate Python processes (avoiding the GIL—Global Interpreter Lock—which prevents true parallelism on threads):
from concurrent.futures import ProcessPoolExecutor
import asyncio
def embed_text_locally(text: str, model: str = "all-minilm-l6-v2") -> list:
"""
Generate embedding locally (CPU-intensive, GIL-blocking).
Using sentence-transformers library.
"""
from sentence_transformers import SentenceTransformer
model_instance = SentenceTransformer(model)
embedding = model_instance.encode(text)
return embedding.tolist()
async def embed_llm_outputs(
outputs: List[str],
executor: ProcessPoolExecutor,
) -> dict:
"""
Embed LLM outputs across multiple processes (true parallelism).
Each process runs on a separate CPU core.
"""
loop = asyncio.get_event_loop()
embed_tasks = [
loop.run_in_executor(executor, embed_text_locally, output)
for output in outputs
]
embeddings = await asyncio.gather(*embed_tasks)
return {
"embeddings": embeddings,
"count": len(embeddings),
}
# Example usage
if __name__ == "__main__":
# ProcessPoolExecutor requires if __name__ == "__main__" guard.
with ProcessPoolExecutor(max_workers=4) as executor:
outputs = [
"The capital of France is Paris.",
"Quantum computing exploits superposition.",
]
result = asyncio.run(embed_llm_outputs(outputs, executor))
print(f"Generated {result['count']} embeddings")
Process pools are slower to spawn (fork cost: ~50 ms per process), so create them once and reuse them across many requests.
Queue-Based Worker Pool for Scaling
For production systems processing thousands of requests per second, implement a persistent worker pool with a bounded queue:
import queue
import threading
from typing import Callable, Any
class WorkerPool:
"""
Simple FIFO worker pool for task distribution.
Workers pull from a task queue and push results to a result queue.
"""
def __init__(self, num_workers: int = 4, task_fn: Callable = None):
self.num_workers = num_workers
self.task_queue = queue.Queue(maxsize=1000) # Bounded queue
self.result_queue = queue.Queue()
self.task_fn = task_fn
self.workers = []
# Start worker threads.
for i in range(num_workers):
worker = threading.Thread(target=self._worker_loop, daemon=True)
worker.start()
self.workers.append(worker)
def _worker_loop(self) -> None:
"""Infinite loop: pull tasks, execute, push results."""
while True:
try:
task_id, task_data = self.task_queue.get(timeout=1)
except queue.Empty:
continue
try:
result = self.task_fn(task_data)
self.result_queue.put({"task_id": task_id, "result": result})
except Exception as e:
self.result_queue.put({"task_id": task_id, "error": str(e)})
finally:
self.task_queue.task_done()
def submit(self, task_id: str, task_data: Any) -> None:
"""Submit a task to the queue."""
self.task_queue.put((task_id, task_data))
def get_result(self, timeout: float = None) -> dict:
"""Retrieve a result from the result queue."""
return self.result_queue.get(timeout=timeout)
def shutdown(self, wait: bool = True) -> None:
"""Gracefully shut down workers."""
if wait:
self.task_queue.join()
# Example: process LLM metadata in parallel
def process_metadata(metadata: dict) -> dict:
"""Worker function: extract and validate metadata."""
return {
"model": metadata.get("model"),
"tokens": metadata.get("tokens", 0),
"cost": metadata.get("tokens", 0) * 0.00002, # $0.02 per 1M tokens
}
if __name__ == "__main__":
pool = WorkerPool(num_workers=4, task_fn=process_metadata)
# Submit 100 tasks.
for i in range(100):
pool.submit(f"task_{i}", {"model": "gpt-4", "tokens": 500 + i})
# Retrieve results as they complete (order not guaranteed).
for _ in range(100):
result = pool.get_result(timeout=5)
print(f"Task {result['task_id']}: cost ${result['result']['cost']:.4f}")
pool.shutdown()
Monitoring Worker Pools
In production, monitor queue depths and worker utilization to detect bottlenecks:
def monitor_pool(pool: WorkerPool, interval_sec: float = 5) -> None:
"""Print pool stats every interval_sec."""
import time
while True:
queue_depth = pool.task_queue.qsize()
result_depth = pool.result_queue.qsize()
print(f"Task queue: {queue_depth}, Result queue: {result_depth}")
print(f"Queue wait time: {queue_depth * 100 / pool.num_workers:.0f} ms (estimated)")
time.sleep(interval_sec)
If queue_depth grows unbounded, you're adding tasks faster than workers can process them. Scale horizontally (more machines) or increase num_workers.
Key Takeaways
- Worker pools exploit multiple CPU cores: Scale throughput 5–20x by spawning workers equal to CPU count.
- ThreadPoolExecutor for I/O-bound work: Database writes, cache updates, lightweight post-processing.
- ProcessPoolExecutor for CPU-bound work: Embeddings, parsing, tokenization. Avoids GIL blocking.
- Use bounded queues to prevent memory exhaustion: Cap queue size to ~1000 tasks; reject new work if full.
- Monitor queue depth: Unbounded growth signals you need more workers or horizontal scaling.
Frequently Asked Questions
How many workers should I spawn?
For thread pools, start with CPU count (e.g., 8 cores = 8 workers). For process pools, also CPU count, but account for the 50 ms fork cost. For I/O pools, you can go higher (20–100) since threads are lightweight.
What's the difference between ThreadPoolExecutor and ProcessPoolExecutor?
Threads share memory (fast, low overhead) but are limited by the GIL for CPU-bound work. Processes have separate memory (slow to spawn, high overhead) but true parallelism on multiple cores. Use threads for I/O; processes for CPU.
Can I mix async and thread pools?
Yes, loop.run_in_executor() bridges them perfectly. Keep the main event loop async and offload blocking work to threads.
What if the queue fills up?
By default, queue.Queue(maxsize=1000) blocks put() when full. You can handle it: catch the exception, reject the request with an error, or increase the queue size. Rejecting early is better than queuing forever.
How do I implement graceful shutdown?
Call pool.shutdown(wait=True) to wait for all tasks to finish, or wait=False to terminate immediately. Use queue.task_done() and queue.join() to ensure all tasks complete before returning.
Further Reading
- concurrent.futures Documentation — ThreadPoolExecutor and ProcessPoolExecutor API.
- asyncio.run_in_executor() — async/executor bridge.
- Python GIL Explained — why processes matter for CPU-bound work.
- Job Queues and Worker Patterns — architectural patterns for reliable job processing.