Skip to main content

Batch Generation Workflows: Scaling Image Production

Generating hundreds or thousands of images for e-commerce catalogs, content platforms, or marketing campaigns requires more than iterating generation calls. Batch generation workflows handle queuing, error recovery, cost tracking, and quality monitoring at scale. This article covers architectural patterns, queue management, and optimization strategies for production image pipelines.

Why Batch Workflows Matter

Individual generation calls are stateless, but production requires: (1) work queue persistence—jobs survive API failures and restarts, (2) priority handling—some images are more urgent than others, (3) cost tracking—monitor spending across thousands of generations, (4) error recovery—retry failed jobs automatically, (5) quality gates—regenerate images that fail QA checks, (6) progress visibility—know what's done, what's in progress, what's queued.

A batch workflow architecture decouples job submission, generation, and result handling, enabling scalability and resilience.

Simple Queue-Based Architecture

import json
import time
from datetime import datetime
from enum import Enum
from pathlib import Path
import anthropic

class JobStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
RETRY = "retry"

class ImageGenerationQueue:
def __init__(self, queue_file="generation_queue.jsonl"):
self.queue_file = queue_file
self.client = anthropic.Anthropic()
self.max_retries = 3

def add_job(self, job_id, prompt, negative_prompt, parameters=None):
"""Add a generation job to the queue."""
job = {
"job_id": job_id,
"prompt": prompt,
"negative_prompt": negative_prompt,
"parameters": parameters or {},
"status": JobStatus.PENDING.value,
"created_at": datetime.utcnow().isoformat(),
"started_at": None,
"completed_at": None,
"retry_count": 0,
"error_message": None,
"output_path": None
}

with open(self.queue_file, "a") as f:
f.write(json.dumps(job) + "\n")

return job

def get_next_job(self):
"""Retrieve the next pending or retry job."""
jobs = self._load_all_jobs()

for job in jobs:
if job["status"] in [JobStatus.PENDING.value, JobStatus.RETRY.value]:
return job

return None

def process_next(self):
"""Process one job from the queue."""
job = self.get_next_job()
if not job:
print("No pending jobs")
return None

job_id = job["job_id"]
print(f"Processing job: {job_id}")

try:
# Mark as in progress
self._update_job_status(job_id, JobStatus.IN_PROGRESS.value)

# Generate image
result = self.client.messages.create(
model="stable-diffusion-3",
max_tokens=1024,
messages=[
{
"role": "user",
"content": f"Generate: {job['prompt']}"
}
]
)

# Save output
output_path = f"output/{job_id}.jpg"
# Assume result.save(output_path) available

# Mark as completed
self._update_job_status(
job_id,
JobStatus.COMPLETED.value,
output_path=output_path
)
print(f"Completed: {job_id}")

except Exception as e:
# Handle error with retry logic
error_msg = str(e)
job["retry_count"] += 1

if job["retry_count"] < self.max_retries:
print(f"Failed (will retry): {job_id} - {error_msg}")
self._update_job_status(job_id, JobStatus.RETRY.value, error=error_msg)
else:
print(f"Failed permanently: {job_id} - {error_msg}")
self._update_job_status(job_id, JobStatus.FAILED.value, error=error_msg)

def process_batch(self, max_jobs=None):
"""Process all pending jobs."""
processed = 0
while True:
if max_jobs and processed >= max_jobs:
break

job = self.get_next_job()
if not job:
break

self.process_next()
processed += 1

# Add delay to respect rate limits
time.sleep(1)

print(f"Batch processing completed. Processed: {processed} jobs")

def _update_job_status(self, job_id, status, output_path=None, error=None):
"""Update job status in queue file."""
jobs = self._load_all_jobs()

for job in jobs:
if job["job_id"] == job_id:
job["status"] = status
job["updated_at"] = datetime.utcnow().isoformat()

if status == JobStatus.IN_PROGRESS.value:
job["started_at"] = datetime.utcnow().isoformat()
elif status == JobStatus.COMPLETED.value:
job["completed_at"] = datetime.utcnow().isoformat()
job["output_path"] = output_path
elif status == JobStatus.FAILED.value:
job["error_message"] = error

break

# Rewrite queue file
with open(self.queue_file, "w") as f:
for job in jobs:
f.write(json.dumps(job) + "\n")

def _load_all_jobs(self):
"""Load all jobs from queue file."""
jobs = []
if Path(self.queue_file).exists():
with open(self.queue_file, "r") as f:
for line in f:
jobs.append(json.loads(line))
return jobs

def get_stats(self):
"""Get queue statistics."""
jobs = self._load_all_jobs()
stats = {
"total": len(jobs),
"pending": sum(1 for j in jobs if j["status"] == JobStatus.PENDING.value),
"in_progress": sum(1 for j in jobs if j["status"] == JobStatus.IN_PROGRESS.value),
"completed": sum(1 for j in jobs if j["status"] == JobStatus.COMPLETED.value),
"failed": sum(1 for j in jobs if j["status"] == JobStatus.FAILED.value),
"retry": sum(1 for j in jobs if j["status"] == JobStatus.RETRY.value)
}
return stats

# Usage
queue = ImageGenerationQueue()

# Add jobs
queue.add_job("product_001", "professional product photo of headphones", "blurry, low quality")
queue.add_job("product_002", "professional product photo of smartwatch", "blurry, low quality")
queue.add_job("product_003", "professional product photo of laptop", "blurry, low quality")

# Process batch
queue.process_batch(max_jobs=10)

# Check status
stats = queue.get_stats()
print(f"Status: {stats}")

Advanced Queue Management with Priorities

import heapq

class PriorityImageQueue:
def __init__(self, queue_file="priority_queue.jsonl"):
self.queue_file = queue_file
self.priority_heap = []
self.client = anthropic.Anthropic()

def add_job(self, job_id, prompt, negative_prompt, priority=5, parameters=None):
"""Add a job with priority (1=highest, 10=lowest)."""
job = {
"job_id": job_id,
"prompt": prompt,
"negative_prompt": negative_prompt,
"priority": priority, # 1-10, lower = higher priority
"parameters": parameters or {},
"status": JobStatus.PENDING.value,
"created_at": datetime.utcnow().isoformat(),
"retry_count": 0
}

# Add to heap for priority ordering
heapq.heappush(self.priority_heap, (priority, job_id, job))

# Persist to file
with open(self.queue_file, "a") as f:
f.write(json.dumps(job) + "\n")

def get_next_job(self):
"""Get the highest-priority pending job."""
while self.priority_heap:
priority, job_id, job = heapq.heappop(self.priority_heap)

# Verify job still exists and is pending
if self._job_status(job_id) == JobStatus.PENDING.value:
return job

return None

def _job_status(self, job_id):
"""Check current status of a job."""
# Implementation omitted for brevity
pass

# Usage: Critical product images get priority
queue = PriorityImageQueue()
queue.add_job("featured_product_1", "...", "...", priority=1) # Highest
queue.add_job("secondary_product_1", "...", "...", priority=5) # Medium
queue.add_job("archive_product_1", "...", "...", priority=10) # Lowest

Cost Tracking and Optimization

class CostTrackingPipeline:
def __init__(self):
self.client = anthropic.Anthropic()
self.cost_log = []

# Typical pricing (2026 estimates, verify with your provider)
self.pricing = {
"sd3": 0.005, # $0.005 per image
"dalle3": 0.02, # $0.02 per image
"video": 0.10 # $0.10 per video
}

def generate_with_cost_tracking(self, job_id, prompt, model="sd3"):
"""Generate image and track cost."""
try:
result = self.client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)

cost = self.pricing[model]
self.cost_log.append({
"job_id": job_id,
"model": model,
"cost": cost,
"timestamp": datetime.utcnow().isoformat()
})

return result, cost

except Exception as e:
# Failed job still costs money
cost = self.pricing[model] * 0.5 # Assume half cost for failed attempts
self.cost_log.append({
"job_id": job_id,
"model": model,
"cost": cost,
"status": "failed",
"timestamp": datetime.utcnow().isoformat()
})
raise

def get_cost_summary(self):
"""Calculate total costs by model and timeframe."""
summary = {}
total_cost = 0

for entry in self.cost_log:
model = entry["model"]
cost = entry["cost"]

if model not in summary:
summary[model] = {"count": 0, "cost": 0}

summary[model]["count"] += 1
summary[model]["cost"] += cost
total_cost += cost

summary["total"] = total_cost
return summary

def optimize_prompt(self, prompt, target_model="sd3"):
"""Suggest optimizations to reduce generation cost/time."""
suggestions = []

if len(prompt) > 150:
suggestions.append("Prompt is long; trim to 80-100 tokens for faster generation")

if "ultra detailed" in prompt.lower() and "8k" in prompt.lower():
suggestions.append("Both 'ultra detailed' and '8k' may increase cost; consider one or the other")

return suggestions

# Usage
pipeline = CostTrackingPipeline()

# Generate and track
result, cost = pipeline.generate_with_cost_tracking(
job_id="product_001",
prompt="professional product photo of smartwatch",
model="sd3"
)
print(f"Generation cost: ${cost}")

# Get summary
summary = pipeline.get_cost_summary()
print(f"Total spending: ${summary['total']:.2f}")

Quality Gates and Automated Retry

class QualityGatedPipeline:
def __init__(self):
self.client = anthropic.Anthropic()

def evaluate_quality(self, image_path, quality_criteria=None):
"""Evaluate if generated image meets quality standards."""
criteria = quality_criteria or {
"min_sharpness": 0.7,
"color_saturation": (0.3, 0.95),
"faces_present": False # Set True to reject if faces too distorted
}

# Placeholder: In real implementation, use vision model or ML classifier
# to evaluate sharpness, color, composition, etc.
score = self._compute_quality_score(image_path, criteria)
return score >= 0.8, score

def generate_with_quality_gate(self, job_id, prompt, negative_prompt, max_retries=3):
"""Generate until quality threshold is met."""
for attempt in range(max_retries):
result = self.client.messages.create(
model="stable-diffusion-3",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)

# Save temp output
temp_path = f"temp/{job_id}_attempt{attempt}.jpg"
# result.save(temp_path)

# Evaluate quality
passes, score = self.evaluate_quality(temp_path)
print(f"Attempt {attempt + 1}: quality score {score:.2f}")

if passes:
print(f"Quality gate passed for {job_id}")
return result, temp_path

print(f"Quality gate failed after {max_retries} attempts for {job_id}")
return None, None

def _compute_quality_score(self, image_path, criteria):
"""Compute quality metric (placeholder)."""
# Real implementation would use vision model
return 0.85

# Usage
pipeline = QualityGatedPipeline()
result, output_path = pipeline.generate_with_quality_gate(
job_id="product_001",
prompt="professional product photo",
negative_prompt="blurry, low quality"
)

Key Takeaways

  • Use queue-based architecture for persistent, recoverable batch processing.
  • Implement status tracking (pending, in_progress, completed, failed, retry) for observability.
  • Add automatic retry logic with exponential backoff for transient failures.
  • Prioritize jobs and track cost to optimize spending on large campaigns.
  • Implement quality gates to automatically retry images that fail QA standards.
  • Monitor progress and statistics in real time for transparency and debugging.

Frequently Asked Questions

How do I handle rate limiting in batch generation?

Add delays between requests (1–2 seconds minimum) and implement exponential backoff on failures. Most APIs have rate limits (e.g., 10 images/minute); check your provider's documentation. For high-volume needs, request rate limit increases or use dedicated endpoints.

What if my queue process crashes midway?

That's why you persist the queue to disk. On restart, load the queue from the file and resume processing. Jobs marked "in_progress" that didn't complete should be reset to "pending" or "retry" depending on your policy.

Should I process jobs sequentially or in parallel?

Sequential is simpler and avoids overwhelming APIs. For massive scale (thousands of images), use parallel workers with a shared queue backend (Redis, database) rather than file-based queuing. Start sequential, then parallelize once you hit bottlenecks.

How do I estimate total cost for a campaign?

Track cost per generation, multiply by expected volume, and add 10–20% buffer for retries. Monitor costs in real time and pause if spending exceeds budget. Consider batch discounts if your provider offers them.

What should my quality gate threshold be?

This depends on your use case. For e-commerce product images, aim high (>0.85). For content generation, acceptable is lower (>0.70). Start conservative, then relax based on customer feedback.

Further Reading