Building an End-to-End Generation Pipeline
Building a production synthetic data pipeline requires orchestrating generation, validation, deduplication, PII removal, and quality monitoring into a reliable, scalable system. A well-engineered pipeline can generate 100,000 examples in hours, automatically filter low-quality outputs, and export clean, shareable datasets. This article provides a complete implementation guide with code you can adapt for your use case.
Pipeline Architecture Overview
A production pipeline follows this sequence:
Configuration → Generation → Parsing → Validation → Deduplication →
PII Removal → Quality Metrics → Export
↑ ↓
└─────────────── Feedback Loop (adjust prompts) ───────────┘
Each stage filters bad data progressively, preserving valid examples. Stages run sequentially for small datasets (10K examples), or in parallel for large ones (100K+).
Complete Pipeline Implementation
import json
import logging
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional, Any
import anthropic
from datetime import datetime
import hashlib
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
"""Configuration for the synthetic data generation pipeline."""
model: str = "claude-3-5-sonnet-20241022"
target_examples: int = 10000
batch_size: int = 50
temperature: float = 0.85
max_retries: int = 3
validation_schema: Dict = None
text_fields: List[str] = None
dedup_threshold: float = 0.87
generation_prompt: str = ""
output_dir: str = "./synthetic_data"
class SyntheticDataPipeline:
"""
Complete pipeline for generating, validating, and filtering synthetic data.
"""
def __init__(self, config: PipelineConfig):
self.config = config
self.client = anthropic.Anthropic()
self.stats = {
"generated": 0,
"parsed": 0,
"validated": 0,
"deduplicated": 0,
"pii_cleaned": 0,
"final": 0,
"failures": []
}
# ========== STAGE 1: GENERATION ==========
def generate_batch(self, batch_num: int) -> List[str]:
"""
Generate a batch of examples using Claude.
Returns: List of raw text outputs (unparsed)
"""
prompt = self.config.generation_prompt.format(
batch_num=batch_num,
batch_size=self.config.batch_size
)
for attempt in range(self.config.max_retries):
try:
message = self.client.messages.create(
model=self.config.model,
max_tokens=4096,
temperature=self.config.temperature,
messages=[{"role": "user", "content": prompt}]
)
# Extract the response text
response_text = message.content[0].text
logger.info(f"Batch {batch_num}: Generated {len(response_text)} chars")
# Try to parse as JSON array; if it fails, return as single-item list
try:
parsed = json.loads(response_text)
if isinstance(parsed, list):
return parsed
else:
return [parsed]
except json.JSONDecodeError:
# Return raw text for later parsing
return [response_text]
except Exception as e:
logger.warning(f"Batch {batch_num}, attempt {attempt + 1}: {e}")
if attempt == self.config.max_retries - 1:
self.stats["failures"].append(
f"Batch {batch_num}: Max retries exceeded"
)
return []
return []
# ========== STAGE 2: PARSING & VALIDATION ==========
def validate_schema(self, example: Any) -> Tuple[bool, str]:
"""
Validate that example matches schema.
Returns: (is_valid, error_message)
"""
if not isinstance(example, dict):
return False, "Not a dictionary"
if self.config.validation_schema is None:
return True, "" # No schema defined, skip
# Check required fields
if "required" in self.config.validation_schema:
for field in self.config.validation_schema["required"]:
if field not in example:
return False, f"Missing required field: {field}"
# Check field types
if "properties" in self.config.validation_schema:
for field, field_spec in self.config.validation_schema["properties"].items():
if field in example:
expected_type = field_spec.get("type", "string")
if expected_type == "string" and not isinstance(example[field], str):
return False, f"Field {field}: expected string, got {type(example[field])}"
elif expected_type == "number" and not isinstance(example[field], (int, float)):
return False, f"Field {field}: expected number, got {type(example[field])}"
return True, ""
def validate_constraints(self, example: Dict) -> Tuple[bool, List[str]]:
"""
Validate business logic constraints.
Returns: (is_valid, list_of_violations)
"""
violations = []
# Check for excessive repetition (poor quality indicator)
for field in self.config.text_fields or []:
if field in example:
text = str(example[field])
words = text.split()
if len(words) > 10:
unique_ratio = len(set(words)) / len(words)
if unique_ratio < 0.6:
violations.append(f"{field}: High word repetition")
# Check for placeholder/template language
suspicious = ["test", "placeholder", "lorem ipsum", "as above"]
for field in self.config.text_fields or []:
if field in example:
text = str(example[field]).lower()
if any(s in text for s in suspicious):
violations.append(f"{field}: Suspicious template language")
return len(violations) == 0, violations
def process_batch(self, raw_batch: List[str]) -> List[Dict]:
"""
Parse, validate, and filter a batch.
Returns: List of valid examples
"""
valid_examples = []
for raw in raw_batch:
# Parse
try:
if isinstance(raw, dict):
example = raw
else:
example = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
# Schema validation
is_schema_valid, schema_error = self.validate_schema(example)
if not is_schema_valid:
continue
# Constraint validation
is_constraint_valid, violations = self.validate_constraints(example)
if not is_constraint_valid:
continue
valid_examples.append(example)
self.stats["validated"] += 1
return valid_examples
# ========== STAGE 3: DEDUPLICATION ==========
def deduplicate(self, examples: List[Dict]) -> List[Dict]:
"""
Remove near-duplicate examples using simple hashing.
For production, use embedding-based deduplication (see article 6).
"""
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
if not examples or not self.config.text_fields:
return examples
# Get embeddings for primary text field
primary_field = self.config.text_fields[0]
texts = [str(ex.get(primary_field, "")) for ex in examples]
embeddings = model.encode(texts, convert_to_numpy=True)
import numpy as np
# Simple deduplication: remove if too similar to an earlier example
kept = []
kept_indices = []
for i, (example, emb) in enumerate(zip(examples, embeddings)):
is_duplicate = False
for kept_emb in [embeddings[j] for j in kept_indices]:
similarity = float(np.dot(emb, kept_emb) / (
np.linalg.norm(emb) * np.linalg.norm(kept_emb) + 1e-8
))
if similarity > self.config.dedup_threshold:
is_duplicate = True
break
if not is_duplicate:
kept.append(example)
kept_indices.append(i)
self.stats["deduplicated"] = len(examples) - len(kept)
logger.info(f"Deduplication: {len(examples)} -> {len(kept)} (removed {self.stats['deduplicated']})")
return kept
# ========== STAGE 4: PII REMOVAL ==========
def remove_pii(self, examples: List[Dict]) -> List[Dict]:
"""
Remove PII from text fields.
"""
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()
cleaned = []
for example in examples:
cleaned_example = example.copy()
for field in self.config.text_fields or []:
if field not in example:
continue
text = str(example[field])
# Detect PII
findings = analyzer.analyze(text=text, language="en")
if findings:
# Mask with placeholders
masked_text = text
for finding in sorted(findings, key=lambda x: x.start, reverse=True):
placeholder = f"[{finding.entity_type}]"
masked_text = (
masked_text[:finding.start] +
placeholder +
masked_text[finding.end:]
)
cleaned_example[field] = masked_text
cleaned.append(cleaned_example)
self.stats["pii_cleaned"] += 1
return cleaned
# ========== MAIN PIPELINE ==========
def run(self) -> Tuple[List[Dict], Dict]:
"""
Execute the full pipeline.
Returns: (final_dataset, statistics)
"""
all_examples = []
# Generate batches
num_batches = (self.config.target_examples + self.config.batch_size - 1) // self.config.batch_size
logger.info(f"Starting pipeline: {self.config.target_examples} target examples ({num_batches} batches)")
for batch_num in range(num_batches):
logger.info(f"\n=== Batch {batch_num + 1}/{num_batches} ===")
# Generate
raw_batch = self.generate_batch(batch_num)
self.stats["generated"] += len(raw_batch)
# Validate
valid_batch = self.process_batch(raw_batch)
logger.info(f"Validation: {len(raw_batch)} -> {len(valid_batch)}")
# Deduplicate
dedup_batch = self.deduplicate(valid_batch)
all_examples.extend(dedup_batch)
# Check if we've reached target
if len(all_examples) >= self.config.target_examples:
all_examples = all_examples[:self.config.target_examples]
break
# PII removal
logger.info(f"\nRemoving PII from {len(all_examples)} examples...")
final_dataset = self.remove_pii(all_examples)
self.stats["final"] = len(final_dataset)
logger.info(f"\nPipeline complete!")
logger.info(f"Final dataset size: {self.stats['final']}")
return final_dataset, self.stats
# ========== EXPORT ==========
def export(self, dataset: List[Dict], filename: str = "synthetic_dataset.jsonl"):
"""
Export dataset to JSONL format.
"""
import os
os.makedirs(self.config.output_dir, exist_ok=True)
filepath = os.path.join(self.config.output_dir, filename)
with open(filepath, 'w') as f:
for example in dataset:
f.write(json.dumps(example) + '\n')
logger.info(f"Exported {len(dataset)} examples to {filepath}")
return filepath
# ========== USAGE EXAMPLE ==========
if __name__ == "__main__":
# Define configuration
generation_prompt = """Generate {batch_size} customer support tickets as a JSON array.
Each ticket must be a valid JSON object with these fields:
- ticket_id: string (format TICKET-XXXXX)
- severity: string (one of: Low, Medium, High, Critical)
- description: string (50-150 words)
- category: string (one of: Bug, Feature Request, Billing)
Ensure diversity across severity levels and categories.
No profanity, no placeholder language.
Return ONLY the JSON array, no markdown or explanation."""
schema = {
"type": "object",
"required": ["ticket_id", "severity", "description", "category"],
"properties": {
"ticket_id": {"type": "string"},
"severity": {"type": "string"},
"description": {"type": "string"},
"category": {"type": "string"}
}
}
config = PipelineConfig(
target_examples=100, # Start small for testing
batch_size=10,
temperature=0.85,
validation_schema=schema,
text_fields=["description"],
generation_prompt=generation_prompt
)
# Run pipeline
pipeline = SyntheticDataPipeline(config)
dataset, stats = pipeline.run()
# Export
pipeline.export(dataset)
# Print statistics
print("\n=== Pipeline Statistics ===")
for key, value in stats.items():
if key != "failures":
print(f"{key}: {value}")
if stats["failures"]:
print("\nFailures:")
for failure in stats["failures"][:5]:
print(f" - {failure}")
Key Design Decisions
Batching Strategy
Generating 100,000 examples in one API call is too expensive and unreliable. Instead, generate in batches of 50–100, validate each batch, and continue iteratively. This also enables parallel processing and early error detection.
Error Handling
Each stage gracefully skips invalid examples rather than failing the entire pipeline. The pipeline continues, logging errors for later review. Critical failures (all examples fail validation for 3 consecutive batches) would trigger a pipeline pause and manual investigation.
Deduplication Timing
Deduplication happens before PII removal. This avoids the overhead of removing PII from examples that will be discarded as duplicates anyway.
Monitoring and Observability
Track key metrics:
def print_pipeline_metrics(stats: Dict):
"""Print human-readable pipeline metrics."""
total_generated = stats["generated"]
final_count = stats["final"]
if total_generated > 0:
pass_rate = 100 * final_count / total_generated
print(f"Overall pass rate: {pass_rate:.1f}%")
print(f" Generated: {total_generated}")
print(f" Validated: {stats['validated']} ({100*stats['validated']/total_generated:.1f}%)")
print(f" Dedup loss: {stats['deduplicated']} ({100*stats['deduplicated']/max(stats['validated'],1):.1f}%)")
print(f" PII cleaned: {stats['pii_cleaned']}")
print(f" Final: {final_count}")
# Cost estimate (Claude 3.5 Sonnet: ~$0.003 per 1K input + output tokens)
est_tokens = total_generated * 150 # Rough estimate
est_cost = est_tokens * 0.000003
print(f"Estimated cost: ${est_cost:.2f}")
Key Takeaways
- Pipeline stages filter progressively: generation → validation → deduplication → PII removal.
- Batch in chunks of 50–100 to enable parallelization and early error detection.
- Log comprehensive statistics at each stage for observability.
- Deduplication before PII removal saves wasted processing.
- Export to JSONL format for streaming consumption by downstream ML systems.
- Monitor pass rates; if <50% pass validation, refinement is needed upstream (prompt adjustment).
Frequently Asked Questions
Can I parallelize batch generation?
Yes. Use a job queue (Celery, Ray) or async calls to generate multiple batches concurrently. Be mindful of API rate limits; most providers allow 500–1000 requests per minute.
What's a good pass rate to target?
70–85% through the full pipeline is typical. If <50%, your prompts or validation rules are too strict; if >95%, your validation may be too lenient.
Should I version my datasets?
Yes. Save each pipeline run with timestamp, config, and statistics. This enables rollback and analysis if a later batch introduces systematic issues.
Further Reading
- ETL Best Practices for Machine Learning — Google, 2020
- Data Pipeline Orchestration with Apache Airflow — Apache Foundation, 2023
- Monitoring Data Quality in Production ML — IEEE ICDE, 2021