Batch API Strategies: Process LLM Requests at 50% Cost
Batch APIs allow you to submit many LLM requests as a bulk job, process them asynchronously (usually overnight), and retrieve results the next day—in exchange for a 50% discount on token costs. A batch processing architecture is ideal for workloads where latency is negotiable: data labeling, report generation, content transformation, analytics processing. A data-labeling system that processes 10,000 documents per night can use the batch API (50% cost) instead of real-time API (full cost), saving $500/month on a $1,000 labeling budget. Batch APIs are a core part of any cost-optimization strategy because they unlock a latency-cost trade-off: if you can wait 2–24 hours for results, you save 50%. For organizations processing large volumes, batch APIs can be the single largest cost lever after model routing and prompt compression.
Understanding Batch API Mechanics
A batch API works in three phases:
- Submit: You package requests into a JSONL file (one JSON request per line) and upload it to the API.
- Process: The provider's backend processes your batch asynchronously, often overnight or during off-peak hours.
- Retrieve: Next day, you query the batch job status; once complete, you download results.
Anthropic's Batch API accepts up to 100,000 requests per batch. Requests are charged at 50% the normal rate (input and output tokens discounted equally). Processing typically completes within 24 hours; Anthropic guarantees completion within 48 hours. Here is how it works:
import anthropic
import json
from datetime import datetime
client = anthropic.Anthropic()
# Step 1: Prepare requests as JSONL
def prepare_batch_requests(documents: list[str], feature: str) -> str:
"""
Create a batch request file for labeling documents.
Returns path to JSONL file.
"""
requests = []
for i, doc in enumerate(documents):
request = {
"custom_id": f"{feature}_{i}",
"params": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 100,
"messages": [
{
"role": "user",
"content": f"Classify this document as technical, business, or legal: {doc}",
}
],
},
}
requests.append(request)
# Write to JSONL file
filename = f"batch_requests_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
with open(filename, 'w') as f:
for request in requests:
f.write(json.dumps(request) + '\n')
print(f"Prepared {len(requests)} requests in {filename}")
return filename
# Step 2: Upload and submit batch
def submit_batch(jsonl_filename: str) -> str:
"""Upload batch file and return batch ID."""
with open(jsonl_filename, 'rb') as f:
response = client.beta.messages.batches.create(
requests=f,
)
batch_id = response.id
print(f"Batch submitted: {batch_id}")
print(f"Status: {response.processing_status}")
return batch_id
# Step 3: Poll batch status
def wait_for_batch(batch_id: str, max_polls: int = 100):
"""Poll batch status every minute until complete."""
import time
for i in range(max_polls):
batch = client.beta.messages.batches.retrieve(batch_id)
print(f"Poll {i+1}: {batch.processing_status}")
if batch.processing_status == "completed":
print(f"Batch completed!")
print(f"Request counts: {batch.request_counts}")
return batch
time.sleep(60) # Poll every minute
raise TimeoutError(f"Batch {batch_id} did not complete after {max_polls} polls")
# Step 4: Retrieve results
def retrieve_batch_results(batch_id: str) -> list[dict]:
"""Download completed batch results."""
batch = client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status != "completed":
raise ValueError(f"Batch {batch_id} not complete: {batch.processing_status}")
results = []
for result in client.beta.messages.batches.results(batch_id):
if result.result.type == "succeeded":
results.append({
"custom_id": result.custom_id,
"response": result.result.message.content[0].text,
})
else:
print(f"Failed request {result.custom_id}: {result.result.error}")
print(f"Retrieved {len(results)} results")
return results
# End-to-end example
documents = [
"The API rate limit is 100 requests per second.",
"Our Q3 revenue target is $5M.",
"According to contract section 2.3, payment is due within 30 days.",
# ... 9,997 more documents
]
batch_file = prepare_batch_requests(documents[:3], "classification")
batch_id = submit_batch(batch_file)
completed_batch = wait_for_batch(batch_id)
results = retrieve_batch_results(batch_id)
for result in results[:2]:
print(f"{result['custom_id']}: {result['response']}")
This pattern is straightforward: prepare JSONL, upload, wait, retrieve. The latency trade-off is clear: you wait 2–24 hours for results, but save 50% on costs. For non-urgent workloads, this is an obvious win.
Cost Comparison: Real-Time vs Batch
Let's quantify the savings. Suppose you label 100,000 documents monthly, each requiring 50 input tokens and generating 20 output tokens (classification).
Real-time API:
- Input: 100,000 × 50 × $3 / 1M = $15
- Output: 100,000 × 20 × $15 / 1M = $30
- Total: $45/month
Batch API:
- Same tokens, but 50% discount: $45 × 0.5 = $22.50/month
Monthly savings: $22.50 (50% reduction).
Scale this to a billion-token monthly workload (common at larger organizations): $1,500/month saved. For annual workloads (1 billion tokens/year): $18,000/year savings. This is why batch APIs matter—they're a multiplicative cost lever.
Designing Batch Workflows
A production batch workflow combines batch APIs with scheduling and result post-processing:
from datetime import datetime
import schedule
import json
class BatchLabelingSystem:
"""Manage daily batch labeling jobs."""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.job_queue = []
def enqueue_documents(self, documents: list[str], label_type: str):
"""Add documents to nightly batch job."""
self.job_queue.extend([
{"text": doc, "type": label_type}
for doc in documents
])
print(f"Enqueued {len(documents)} for {label_type} labeling")
def nightly_batch_job(self):
"""Run at midnight: submit queued docs as batch job."""
if not self.job_queue:
print("No documents to label")
return
# Prepare batch
requests = []
for i, item in enumerate(self.job_queue):
request = {
"custom_id": f"{item['type']}_{i}",
"params": {
"model": "claude-3-5-haiku-20241022", # Cheap model for batch
"max_tokens": 50,
"messages": [
{
"role": "user",
"content": f"Label as {item['type']}: {item['text']}",
}
],
},
}
requests.append(request)
# Write and upload
filename = f"batch_{datetime.now().strftime('%Y%m%d')}.jsonl"
with open(filename, 'w') as f:
for req in requests:
f.write(json.dumps(req) + '\n')
# Submit
with open(filename, 'rb') as f:
response = self.client.beta.messages.batches.create(requests=f)
batch_id = response.id
print(f"Submitted batch: {batch_id} with {len(requests)} requests")
# Store job for tracking
with open("pending_batches.jsonl", 'a') as f:
f.write(json.dumps({
"batch_id": batch_id,
"submitted": datetime.now().isoformat(),
"request_count": len(requests),
}) + '\n')
# Clear queue
self.job_queue = []
def check_batch_completions(self):
"""Run morning after: check if batches completed."""
with open("pending_batches.jsonl", 'r') as f:
pending = [json.loads(line) for line in f]
completed = []
for job in pending:
batch = self.client.beta.messages.batches.retrieve(job["batch_id"])
if batch.processing_status == "completed":
print(f"Batch {job['batch_id']} completed!")
# Retrieve and save results
results = []
for result in self.client.beta.messages.batches.results(job["batch_id"]):
if result.result.type == "succeeded":
results.append({
"custom_id": result.custom_id,
"label": result.result.message.content[0].text,
})
# Save to database or file
with open(f"results_{job['batch_id']}.jsonl", 'w') as f:
for r in results:
f.write(json.dumps(r) + '\n')
completed.append(job)
# Remove completed from pending
remaining = [j for j in pending if j not in completed]
with open("pending_batches.jsonl", 'w') as f:
for job in remaining:
f.write(json.dumps(job) + '\n')
# Schedule batch operations
system = BatchLabelingSystem(client)
schedule.every().day.at("00:00").do(system.nightly_batch_job)
schedule.every().day.at("08:00").do(system.check_batch_completions)
# Enqueue documents during the day
system.enqueue_documents(["Doc 1", "Doc 2"], "technical")
This workflow separates real-time queuing (fast) from batch processing (cheaper). Users submit documents during the day; they're labeled overnight at 50% cost. Results are available by morning.
Hybrid: Real-Time + Batch for Different SLAs
Use real-time APIs for high-priority, low-latency requests and batch APIs for everything else:
def process_labeling_request(
text: str,
priority: str, # "urgent" or "batch"
) -> str:
"""Route based on priority."""
client = anthropic.Anthropic()
if priority == "urgent":
# Real-time: immediate response
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=50,
messages=[
{
"role": "user",
"content": f"Label: {text}",
}
],
)
return response.content[0].text
else: # priority == "batch"
# Queue for nightly batch (50% cost)
labeling_system.enqueue_documents([text], "default")
return "Queued for batch processing (available tomorrow morning)"
This hybrid approach balances user experience (urgent requests get immediate answers) and cost (batch requests pay 50% less and can wait overnight).
Key Takeaways
- Batch APIs offer 50% cost discount for asynchronous processing (latency trade-off).
- Process non-urgent workloads (data labeling, report generation, content transformation) via batch APIs.
- A billion-token annual workload saves $18,000/year by switching from real-time to batch.
- Design batch workflows with nightly submission, morning result retrieval, and database storage.
- Hybrid approach: real-time for urgent requests, batch for everything else.
Frequently Asked Questions
What is the latency guarantee for batch APIs?
Anthropic guarantees batch completion within 48 hours; typical completion is 2–12 hours depending on queue depth. Plan for 24 hours conservatively (submit midnight, retrieve results next evening). If you need results same-day, batch is not suitable; use real-time API.
Can I cancel a batch job?
No, once submitted, a batch job must run to completion. Design your batch submission carefully: test with a small batch (10–100 requests) before submitting large batches (10,000+). If a batch contains errors, you'll have to reprocess manually.
Should I use Haiku or Sonnet for batch jobs?
Use the smallest model that solves your problem, exactly as you would for real-time. For classification and extraction, Haiku is fine (and 4× cheaper). For reasoning-heavy tasks, use Sonnet. The 50% batch discount applies to all models, so your model choice is independent of batch vs real-time.
How do I handle partial failures in a batch?
Retrieve results and filter for failures (result.result.type == "failed"). Reprocess failed requests in a new batch, or use real-time API for manual inspection. Log failures and their custom_ids so you can trace and debug.
What is the maximum batch size?
Anthropic's batch API handles up to 100,000 requests per batch. If you have more, split into multiple batches. No latency penalty for multiple simultaneous batches (though queue depth affects individual batch completion time).
Further Reading
- Anthropic Batch API Documentation — Official batch API guide and reference.
- OpenAI Batch API — OpenAI's batch processing with 50% discount.
- Google Cloud Batch Processing — Batch processing patterns for cloud workloads.
- Asynchronous Task Queue Patterns — Celery and distributed task queue design (applicable to batch management).