Batch Processing Documents at Scale
Batch processing is the practice of submitting multiple documents for extraction asynchronously, rather than processing them one-by-one in real-time. When you have 10,000 invoices to process, submitting each one individually to an API is slow, expensive, and prone to rate-limit errors. Batch processing allows you to queue documents, submit them in bulk, and retrieve results later — improving throughput and reducing per-document costs by 20-50%.
I've managed extraction pipelines processing 50,000+ documents weekly. The difference between naive sequential processing and optimized batch processing determines whether you process documents in hours or days, and whether your costs are sustainable or prohibitive.
Batch Processing Architectures
Architecture 1: Sequential Batch Processing
Simple approach: group documents into batches, submit each batch, wait for results:
import asyncio
import anthropic
import base64
from pathlib import Path
from typing import List
def process_batch_sequential(image_paths: List[str], batch_size: int = 10) -> List[dict]:
"""
Process documents in sequential batches.
Pros: Simple, minimal memory overhead
Cons: Slower than parallel processing
"""
client = anthropic.Anthropic()
results = []
# Split into batches
batches = [
image_paths[i:i+batch_size]
for i in range(0, len(image_paths), batch_size)
]
for batch_idx, batch in enumerate(batches):
print(f"Processing batch {batch_idx+1}/{len(batches)}")
for image_path in batch:
try:
# Read and encode image
image_data = Path(image_path).read_bytes()
base64_image = base64.standard_b64encode(image_data).decode("utf-8")
# Extract
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image
}
},
{
"type": "text",
"text": "Extract invoice data as JSON: {invoice_number, vendor_name, total_amount}"
}
]
}
]
)
results.append({
"document": image_path,
"status": "success",
"data": response.content[0].text
})
except Exception as e:
results.append({
"document": image_path,
"status": "error",
"error": str(e)
})
return results
# Example usage
image_files = list(Path("/documents").glob("*.jpg"))
results = process_batch_sequential(image_files, batch_size=10)
success_count = sum(1 for r in results if r["status"] == "success")
print(f"Processed {len(results)} documents, {success_count} successful")
Architecture 2: Parallel Batch Processing
Submit multiple documents concurrently, respecting rate limits:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
async def extract_document_async(client: anthropic.Anthropic, image_path: str) -> dict:
"""Extract a single document asynchronously."""
try:
image_data = Path(image_path).read_bytes()
base64_image = base64.standard_b64encode(image_data).decode("utf-8")
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image
}
},
{
"type": "text",
"text": "Extract invoice data as JSON: {invoice_number, vendor_name, total_amount}"
}
]
}
]
)
return {
"document": image_path,
"status": "success",
"data": response.content[0].text
}
except Exception as e:
return {
"document": image_path,
"status": "error",
"error": str(e)
}
async def process_batch_parallel(image_paths: List[str], max_concurrent: int = 5) -> List[dict]:
"""
Process documents in parallel with rate limiting.
max_concurrent: max number of simultaneous requests
"""
client = anthropic.Anthropic()
results = []
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(max_concurrent)
async def extract_with_semaphore(path):
async with semaphore:
return await extract_document_async(client, path)
# Submit all documents
tasks = [extract_with_semaphore(path) for path in image_paths]
results = await asyncio.gather(*tasks)
return results
# Example usage (in async context)
async def main():
image_files = list(Path("/documents").glob("*.jpg"))
results = await process_batch_parallel(image_files, max_concurrent=5)
success_count = sum(1 for r in results if r["status"] == "success")
print(f"Processed {len(results)} documents, {success_count} successful")
# asyncio.run(main())
Error Handling and Retry Logic
Robust batch processing must handle transient failures:
import time
from typing import Callable
class RetryPolicy:
"""Retry logic with exponential backoff."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def with_retries(self, func: Callable, *args, **kwargs):
"""Execute function with retry logic."""
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
print(f"Attempt {attempt+1} failed; retrying in {delay}s...")
time.sleep(delay)
else:
raise
def extract_with_retries(image_path: str) -> dict:
"""Extract a document with automatic retries."""
client = anthropic.Anthropic()
retry_policy = RetryPolicy(max_retries=3, base_delay=1.0)
def extract():
image_data = Path(image_path).read_bytes()
base64_image = base64.standard_b64encode(image_data).decode("utf-8")
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image
}
},
{
"type": "text",
"text": "Extract invoice data"
}
]
}
]
)
return response.content[0].text
try:
data = retry_policy.with_retries(extract)
return {"status": "success", "data": data}
except Exception as e:
return {"status": "error", "error": str(e)}
Progress Tracking and Monitoring
Monitor batch processing jobs:
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class BatchJob:
job_id: str
total_documents: int
processed: int = 0
successful: int = 0
failed: int = 0
started_at: datetime = None
completed_at: datetime = None
@property
def progress_percentage(self) -> float:
return (self.processed / self.total_documents * 100) if self.total_documents > 0 else 0
@property
def success_rate(self) -> float:
return (self.successful / self.processed * 100) if self.processed > 0 else 0
@property
def elapsed_seconds(self) -> float:
if not self.started_at:
return 0
end = self.completed_at or datetime.now()
return (end - self.started_at).total_seconds()
@property
def estimated_remaining_seconds(self) -> float:
if self.processed == 0:
return 0
rate = self.processed / self.elapsed_seconds if self.elapsed_seconds > 0 else 0
remaining = self.total_documents - self.processed
return remaining / rate if rate > 0 else 0
class BatchJobManager:
"""Manage batch jobs and track progress."""
def __init__(self):
self.jobs = {}
def create_job(self, job_id: str, total_documents: int) -> BatchJob:
"""Create a new batch job."""
job = BatchJob(
job_id=job_id,
total_documents=total_documents,
started_at=datetime.now()
)
self.jobs[job_id] = job
return job
def update_progress(self, job_id: str, processed: int, successful: int, failed: int):
"""Update job progress."""
job = self.jobs.get(job_id)
if job:
job.processed = processed
job.successful = successful
job.failed = failed
def complete_job(self, job_id: str):
"""Mark job as complete."""
job = self.jobs.get(job_id)
if job:
job.completed_at = datetime.now()
def get_job_status(self, job_id: str) -> dict:
"""Get job status summary."""
job = self.jobs.get(job_id)
if not job:
return None
return {
"job_id": job.job_id,
"progress": f"{job.processed}/{job.total_documents}",
"progress_percentage": f"{job.progress_percentage:.1f}%",
"success_rate": f"{job.success_rate:.1f}%",
"elapsed_seconds": int(job.elapsed_seconds),
"estimated_remaining_seconds": int(job.estimated_remaining_seconds),
"completed": job.completed_at is not None
}
# Example usage
manager = BatchJobManager()
# Create job for 1000 documents
job = manager.create_job("batch_001", 1000)
# Simulate processing
for i in range(0, 1000, 100):
time.sleep(0.1) # Simulate work
manager.update_progress("batch_001", i, int(i*0.98), int(i*0.02))
status = manager.get_job_status("batch_001")
print(f"Progress: {status['progress_percentage']} | "
f"Success: {status['success_rate']} | "
f"ETA: {status['estimated_remaining_seconds']}s")
manager.complete_job("batch_001")
Cost Optimization
Reduce per-document extraction costs:
def estimate_batch_cost(num_documents: int, avg_tokens_per_doc: int = 1500) -> dict:
"""Estimate costs for batch processing."""
# Claude 3.5 Sonnet pricing (as of 2026)
input_cost_per_1m = 3.00 # $3 per 1M input tokens
output_cost_per_1m = 15.00 # $15 per 1M output tokens
# Assuming 80% input, 20% output token ratio
avg_input_tokens = int(avg_tokens_per_doc * 0.8)
avg_output_tokens = int(avg_tokens_per_doc * 0.2)
total_input_tokens = num_documents * avg_input_tokens
total_output_tokens = num_documents * avg_output_tokens
input_cost = (total_input_tokens / 1_000_000) * input_cost_per_1m
output_cost = (total_output_tokens / 1_000_000) * output_cost_per_1m
total_cost = input_cost + output_cost
per_doc_cost = total_cost / num_documents if num_documents > 0 else 0
return {
"total_documents": num_documents,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"input_cost": f"${input_cost:.2f}",
"output_cost": f"${output_cost:.2f}",
"total_cost": f"${total_cost:.2f}",
"cost_per_document": f"${per_doc_cost:.4f}"
}
# Example: estimate cost for 10,000 documents
cost_estimate = estimate_batch_cost(10000)
print(json.dumps(cost_estimate, indent=2))
# Output: {"total_documents": 10000, ..., "total_cost": "$22.50", "cost_per_document": "$0.0023"}
def optimize_batch_cost(num_documents: int) -> dict:
"""Suggest optimizations for cost."""
suggestions = []
# Suggestion 1: Batch size optimization
suggestions.append({
"optimization": "Batch size",
"current": "Processing sequentially",
"recommendation": "Group into batches of 50-100 to improve throughput"
})
# Suggestion 2: Model selection
suggestions.append({
"optimization": "Model selection",
"current": "Using Claude 3.5 Sonnet for all documents",
"recommendation": "Use Claude 3.5 Haiku for simple invoices (50% cheaper); reserve Sonnet for complex documents"
})
# Suggestion 3: Caching
suggestions.append({
"optimization": "Prompt caching",
"current": "No caching",
"recommendation": f"Cache extraction schema ({num_documents} × 500 bytes saved) for ~25% input token reduction"
})
return {"cost_optimizations": suggestions}
Checkpointing and Resume
For long-running batch jobs, save progress so failed jobs can resume:
import pickle
class BatchCheckpoint:
"""Save and resume batch processing."""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save_checkpoint(self, job_id: str, processed_files: list[str],
results: list[dict]):
"""Save checkpoint for a batch job."""
checkpoint_file = self.checkpoint_dir / f"{job_id}.pkl"
checkpoint_data = {
"job_id": job_id,
"processed_files": processed_files,
"results": results,
"timestamp": datetime.now().isoformat()
}
with open(checkpoint_file, "wb") as f:
pickle.dump(checkpoint_data, f)
print(f"Checkpoint saved: {checkpoint_file}")
def load_checkpoint(self, job_id: str) -> dict:
"""Load checkpoint for a batch job."""
checkpoint_file = self.checkpoint_dir / f"{job_id}.pkl"
if not checkpoint_file.exists():
return None
with open(checkpoint_file, "rb") as f:
checkpoint_data = pickle.load(f)
print(f"Checkpoint loaded: {checkpoint_file}")
return checkpoint_data
def resume_batch(self, job_id: str, all_files: list[str]) -> tuple[list[str], list[dict]]:
"""Resume a batch job from checkpoint."""
checkpoint = self.load_checkpoint(job_id)
if checkpoint is None:
return all_files, []
# Remaining files = all files - already processed
processed_files = set(checkpoint["processed_files"])
remaining_files = [f for f in all_files if f not in processed_files]
return remaining_files, checkpoint["results"]
# Example usage
checkpoint_manager = BatchCheckpoint()
# Resume a batch job
remaining_files, previous_results = checkpoint_manager.resume_batch(
"batch_001",
all_files=list(Path("/documents").glob("*.jpg"))
)
print(f"Resuming: {len(remaining_files)} remaining files")
print(f"Previous results: {len(previous_results)} documents")
Key Takeaways
- Batch processing processes multiple documents asynchronously, improving throughput and reducing per-document costs.
- Sequential batches are simple; parallel batches with semaphores are faster but require rate-limiting discipline.
- Robust batch processing includes retry logic with exponential backoff, progress tracking, and comprehensive error handling.
- Monitor job progress with metrics: processed count, success rate, elapsed time, estimated completion.
- Optimize costs by tuning batch size, selecting appropriate models (Haiku for simple docs, Sonnet for complex), and using prompt caching.
- Save checkpoints so failed jobs can resume without reprocessing completed documents.
Frequently Asked Questions
What batch size should I use?
For parallel processing, 10-50 concurrent documents is typical, depending on rate limits and available memory. For sequential batches, 50-500 documents per batch is common. Test with your API to find the sweet spot between throughput and error rate.
How do I handle rate limits?
Use exponential backoff and semaphores to limit concurrent requests. Most APIs have per-second and per-minute limits. Track your actual rate (requests per second) and stay 20-30% below the limit to avoid hitting ceilings.
Should I process documents in a specific order?
Not necessary for pure extraction. But if you prioritize (e.g., large invoices first, high-confidence second), you can get valuable results faster. For batch jobs, process in a deterministic order so checkpoints work reliably.
How do I test batch processing before production?
Start with a small test batch (10-50 documents) to validate extraction, error handling, and cost estimates. Monitor success rates and fine-tune prompts. Use the same code for production; the only difference is batch size and concurrency.
What about memory usage for very large batches?
If processing 100,000+ documents, stream results to disk instead of holding all in memory. Write results incrementally to a database or CSV file. Checkpoint frequently (every 1,000 documents) to minimize data loss if the job fails.