Scaling dataset preparation: Automation and tools
Manual dataset preparation works for hundreds of examples but breaks at thousands. For teams handling 10,000–1,000,000+ examples, automation is essential. This article covers scalable architectures: batch processing, orchestration frameworks, cloud storage, version control, and open-source tools that make dataset preparation reproducible and maintainable.
From Manual to Automated Pipelines
As dataset size grows, manual steps become bottlenecks:
- Manual annotation: 10,000 examples at $1 per label = $10,000 and 2–4 weeks.
- Manual cleaning: Checking 10,000 examples manually = 40+ hours.
- Manual QA: Spot-checking 500 examples = 2+ hours per iteration.
Automated pipelines replace manual work with code:
| Task | Manual | Automated |
|---|---|---|
| Deduplication | 5 hours, $0 | 2 minutes, $0.10 |
| Format validation | 3 hours, $0 | 30 seconds, $0 |
| PII removal | 8 hours, $0 | 5 minutes, $0.50 |
| Class balancing | 2 hours, $0 | 1 minute, $0 |
| Train/val/test split | 1 hour, $0 | 30 seconds, $0 |
For 10,000 examples: manual = 19 hours. Automated = 10 minutes + $0.60 compute. Automation pays for itself immediately.
Architecture 1: Python Scripts + Cloud Storage
For small to medium teams (< 100K examples), a Python-based pipeline with cloud storage works well:
import json
import os
from pathlib import Path
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatasetPipeline:
def __init__(self, input_dir, output_dir, config=None):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.config = config or {}
def load_raw(self):
"""Load raw data from all sources."""
logger.info("Loading raw data...")
examples = []
for source_file in self.input_dir.glob("*.jsonl"):
with open(source_file) as f:
for line in f:
examples.append(json.loads(line))
logger.info(f"Loaded {len(examples)} examples")
return examples
def clean(self, examples):
"""Apply all cleaning transformations."""
logger.info("Cleaning...")
# Deduplicate
seen = set()
unique = []
for ex in examples:
key = json.dumps([ex.get("instruction"), ex.get("response")], sort_keys=True)
if key not in seen:
seen.add(key)
unique.append(ex)
logger.info(f"Removed {len(examples) - len(unique)} duplicates")
# Remove PII
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()
pii_count = 0
for ex in unique:
if "response" in ex:
results = analyzer.analyze(text=ex["response"], language="en")
if results:
pii_count += 1
logger.info(f"Found PII in {pii_count} responses")
return unique
def balance(self, examples):
"""Balance by class."""
logger.info("Balancing...")
from collections import defaultdict
import random
by_class = defaultdict(list)
for ex in examples:
class_label = ex.get("category", "unknown")
by_class[class_label].append(ex)
max_count = max(len(v) for v in by_class.values())
balanced = []
for class_label, class_examples in by_class.items():
if len(class_examples) < max_count:
class_examples.extend(
random.choices(class_examples, k=max_count - len(class_examples))
)
balanced.extend(class_examples)
logger.info(f"Balanced dataset: {len(examples)} -> {len(balanced)}")
return balanced
def split(self, examples, train_ratio=0.7, val_ratio=0.15):
"""Split into train/val/test."""
logger.info("Splitting...")
import random
random.shuffle(examples)
n = len(examples)
train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)
train = examples[:train_end]
val = examples[train_end:val_end]
test = examples[val_end:]
return train, val, test
def save(self, train, val, test):
"""Save split datasets."""
logger.info("Saving...")
for name, data in [("train", train), ("val", val), ("test", test)]:
path = self.output_dir / f"{name}.jsonl"
with open(path, "w") as f:
for ex in data:
f.write(json.dumps(ex) + "\n")
logger.info(f"Saved {len(data)} examples to {path}")
def run(self):
"""Execute full pipeline."""
logger.info("Starting pipeline...")
examples = self.load_raw()
examples = self.clean(examples)
examples = self.balance(examples)
train, val, test = self.split(examples)
self.save(train, val, test)
logger.info("Pipeline complete!")
# Usage
pipeline = DatasetPipeline(input_dir="raw_data", output_dir="processed_data")
pipeline.run()
Cloud storage integration (AWS S3):
import boto3
class S3Pipeline(DatasetPipeline):
def __init__(self, input_dir, output_dir, s3_bucket, aws_profile=None):
super().__init__(input_dir, output_dir)
session = boto3.Session(profile_name=aws_profile)
self.s3 = session.client("s3")
self.bucket = s3_bucket
def upload_to_s3(self, file_path, s3_key):
"""Upload processed data to S3."""
logger.info(f"Uploading {file_path} to s3://{self.bucket}/{s3_key}")
self.s3.upload_file(str(file_path), self.bucket, s3_key)
def download_from_s3(self, s3_key, local_path):
"""Download raw data from S3."""
logger.info(f"Downloading s3://{self.bucket}/{s3_key} to {local_path}")
self.s3.download_file(self.bucket, s3_key, local_path)
def run(self):
"""Execute pipeline and upload results."""
super().run()
# Upload processed files to S3
for file_type in ["train", "val", "test"]:
local_path = self.output_dir / f"{file_type}.jsonl"
s3_key = f"datasets/{file_type}.jsonl"
self.upload_to_s3(local_path, s3_key)
# Usage
pipeline = S3Pipeline(
input_dir="raw_data",
output_dir="processed_data",
s3_bucket="my-ml-datasets",
aws_profile="default"
)
pipeline.run()
Architecture 2: dbt + SQL for Large Datasets
For very large datasets (> 100K examples) stored in a data warehouse, use dbt (data build tool) to orchestrate transformations:
-- dbt models/raw/raw_support_tickets.sql
-- Pull raw data from source
SELECT
ticket_id,
customer_message AS instruction,
agent_response AS response,
created_at,
customer_rating
FROM {{ source('support', 'tickets') }}
WHERE created_at >= '2025-01-01'
-- dbt models/staging/stg_support_tickets.sql
-- Clean and format
{{ config(
materialized='table',
indexes=[{'columns': ['ticket_id'], 'type': 'hash'}]
) }}
WITH raw AS (
SELECT * FROM {{ ref('raw_support_tickets') }}
),
cleaned AS (
SELECT
ticket_id,
TRIM(instruction) AS instruction,
TRIM(response) AS response,
created_at,
customer_rating,
ROW_NUMBER() OVER (PARTITION BY instruction, response ORDER BY created_at) as rn
FROM raw
WHERE LENGTH(instruction) > 5
AND LENGTH(response) > 5
AND customer_rating >= 4
)
SELECT * FROM cleaned
WHERE rn = 1 -- Remove exact duplicates
-- dbt models/mart/finetuning_dataset.sql
-- Final dataset for fine-tuning
{{ config(materialized='view') }}
WITH staging AS (
SELECT * FROM {{ ref('stg_support_tickets') }}
),
anonymized AS (
SELECT
ticket_id,
REGEXP_REPLACE(instruction, r'[a-zA-Z][a-zA-Z0-9._%+-]*@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]') AS instruction,
REGEXP_REPLACE(response, r'[a-zA-Z][a-zA-Z0-9._%+-]*@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]') AS response,
created_at
FROM staging
),
balanced AS (
SELECT
*,
ROW_NUMBER() OVER (ORDER BY RAND()) as sample_id
FROM anonymized
WHERE sample_id <= 10000 -- Limit to 10K examples
)
SELECT * FROM balanced
Run dbt pipeline:
dbt deps # Install dependencies
dbt run # Execute all models
dbt test # Run data tests
dbt docs generate # Generate documentation
Architecture 3: Apache Airflow for Complex Workflows
For teams with complex, multi-step workflows, use Apache Airflow to orchestrate tasks:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3Operator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'dataset_preparation_pipeline',
default_args=default_args,
schedule_interval='@weekly', # Run weekly
start_date=datetime(2026, 1, 1),
)
def load_and_clean(**context):
"""Task 1: Load and clean data."""
from dataset_pipeline import DatasetPipeline
pipeline = DatasetPipeline(input_dir="s3://raw-data", output_dir="/tmp/cleaned")
pipeline.clean()
context['task_instance'].xcom_push(key='cleaned_path', value="/tmp/cleaned")
def balance_and_split(**context):
"""Task 2: Balance and split."""
cleaned_path = context['task_instance'].xcom_pull(task_ids='load_clean', key='cleaned_path')
pipeline = DatasetPipeline(input_dir=cleaned_path, output_dir="/tmp/split")
pipeline.balance()
pipeline.split()
context['task_instance'].xcom_push(key='split_path', value="/tmp/split")
def upload_to_s3(**context):
"""Task 3: Upload to S3."""
split_path = context['task_instance'].xcom_pull(task_ids='balance_split', key='split_path')
# Upload code here
pass
# Define DAG tasks
task_load_clean = PythonOperator(
task_id='load_clean',
python_callable=load_and_clean,
dag=dag,
)
task_balance_split = PythonOperator(
task_id='balance_split',
python_callable=balance_and_split,
dag=dag,
)
task_upload = PythonOperator(
task_id='upload',
python_callable=upload_to_s3,
dag=dag,
)
# Define dependencies
task_load_clean >> task_balance_split >> task_upload
Open-Source Tools for Automation
DVC (Data Version Control): Track dataset versions and reproduce pipelines.
# Initialize DVC
dvc init
# Track dataset
dvc add raw_data.jsonl
# Define pipeline (dvc.yaml)
# stages:
# clean:
# cmd: python clean.py
# deps:
# - raw_data.jsonl
# outs:
# - cleaned_data.jsonl
# Run pipeline
dvc repro
Label Studio: Open-source annotation platform with API integration.
import requests
# Create labeling task
task_data = {
"instruction": "Classify this text",
"text": "Customer feedback here..."
}
response = requests.post(
"http://label-studio:8080/api/tasks",
json={"data": task_data}
)
# Retrieve labeled results
labeled = requests.get("http://label-studio:8080/api/tasks/1/annotations")
Great Expectations: Data validation framework.
import great_expectations as ge
# Create expectation suite
suite = ge.create_expectation_suite("dataset_suite")
# Define expectations
suite.add_expectation(
ge.core.expect_table_row_count_to_be_between(min_value=100, max_value=10000)
)
suite.add_expectation(
ge.core.expect_column_values_to_not_be_null(column="instruction")
)
# Validate dataset
dataset = ge.read_json("dataset.jsonl")
results = dataset.validate(expectation_suite=suite)
print(results)
Best Practices for Scaling
1. Idempotency: Pipelines should be safe to run multiple times. Use deterministic splitting and versioned data sources.
def idempotent_split(examples, seed=42, version=1):
"""Split deterministically based on seed and version."""
import hashlib
for ex in examples:
# Create stable hash per example
content = json.dumps(ex, sort_keys=True)
ex_hash = int(hashlib.md5(content.encode()).hexdigest(), 16)
# Use hash to decide split
split = (ex_hash + version) % 100
if split < 70:
yield "train", ex
elif split < 85:
yield "val", ex
else:
yield "test", ex
2. Versioning: Track dataset versions with metadata.
dataset_metadata = {
"version": "1.0.0",
"created_at": "2026-06-02T10:00:00Z",
"examples_count": 10000,
"train_size": 7000,
"val_size": 1500,
"test_size": 1500,
"sources": ["production_logs", "public_data", "synthetic"],
"pipeline_commit": "abc123def456",
"data_warehouse": "s3://ml-datasets/v1.0.0"
}
with open("dataset_metadata.json", "w") as f:
json.dump(dataset_metadata, f, indent=2)
3. Monitoring: Log metrics and alert on anomalies.
import logging
logger = logging.getLogger(__name__)
def monitor_dataset_health(examples):
"""Track dataset health metrics."""
total = len(examples)
empty_instructions = sum(1 for ex in examples if not ex.get("instruction"))
empty_responses = sum(1 for ex in examples if not ex.get("response"))
logger.info(f"Dataset health check:")
logger.info(f" Total examples: {total}")
logger.info(f" Empty instructions: {empty_instructions} ({100*empty_instructions/total:.2f}%)")
logger.info(f" Empty responses: {empty_responses} ({100*empty_responses/total:.2f}%)")
# Alert if anomaly detected
if empty_instructions / total > 0.01:
logger.warning("Too many empty instructions!")
return {
"total": total,
"empty_instructions": empty_instructions,
"empty_responses": empty_responses
}
Key Takeaways
- Automate dataset preparation at scale: Python scripts for < 100K examples, dbt for SQL-based pipelines, Airflow for complex workflows.
- Use cloud storage (S3, GCS) for versioning and reproducibility.
- Implement idempotent pipelines: safe to run multiple times.
- Track dataset versions and metadata.
- Monitor pipeline health and alert on anomalies.
- Open-source tools (DVC, Label Studio, Great Expectations) accelerate automation.
Frequently Asked Questions
Should I automate from the start, or wait until data scales?
Start simple (Python scripts), then graduate to dbt or Airflow as you hit 10K+ examples. Automation pays for itself when manual effort exceeds 2–3 hours.
How do I handle incremental datasets (new data added weekly)?
Use incremental loading:
SELECT * FROM raw_table
WHERE updated_at >= (SELECT MAX(updated_at) FROM processed_table)
This way, pipelines only process new/changed examples.
What if my pipeline fails mid-run?
Ensure idempotency: re-running should be safe. For Airflow, use task dependencies and sensors to check upstream completion before running.
How do I test pipelines before production?
Use a staging environment: run the full pipeline on a small subset first, validate output, then run on production data.
How do I track data lineage (which examples came from which source)?
Add a source field to each example:
{"instruction": "...", "response": "...", "source": "production_logs", "source_id": "ticket_123"}