Skip to main content

Scaling dataset preparation: Automation and tools

Manual dataset preparation works for hundreds of examples but breaks at thousands. For teams handling 10,000–1,000,000+ examples, automation is essential. This article covers scalable architectures: batch processing, orchestration frameworks, cloud storage, version control, and open-source tools that make dataset preparation reproducible and maintainable.

From Manual to Automated Pipelines

As dataset size grows, manual steps become bottlenecks:

  • Manual annotation: 10,000 examples at $1 per label = $10,000 and 2–4 weeks.
  • Manual cleaning: Checking 10,000 examples manually = 40+ hours.
  • Manual QA: Spot-checking 500 examples = 2+ hours per iteration.

Automated pipelines replace manual work with code:

TaskManualAutomated
Deduplication5 hours, $02 minutes, $0.10
Format validation3 hours, $030 seconds, $0
PII removal8 hours, $05 minutes, $0.50
Class balancing2 hours, $01 minute, $0
Train/val/test split1 hour, $030 seconds, $0

For 10,000 examples: manual = 19 hours. Automated = 10 minutes + $0.60 compute. Automation pays for itself immediately.

Architecture 1: Python Scripts + Cloud Storage

For small to medium teams (< 100K examples), a Python-based pipeline with cloud storage works well:

import json
import os
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DatasetPipeline:
def __init__(self, input_dir, output_dir, config=None):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.config = config or {}

def load_raw(self):
"""Load raw data from all sources."""
logger.info("Loading raw data...")
examples = []

for source_file in self.input_dir.glob("*.jsonl"):
with open(source_file) as f:
for line in f:
examples.append(json.loads(line))

logger.info(f"Loaded {len(examples)} examples")
return examples

def clean(self, examples):
"""Apply all cleaning transformations."""
logger.info("Cleaning...")

# Deduplicate
seen = set()
unique = []
for ex in examples:
key = json.dumps([ex.get("instruction"), ex.get("response")], sort_keys=True)
if key not in seen:
seen.add(key)
unique.append(ex)

logger.info(f"Removed {len(examples) - len(unique)} duplicates")

# Remove PII
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()

pii_count = 0
for ex in unique:
if "response" in ex:
results = analyzer.analyze(text=ex["response"], language="en")
if results:
pii_count += 1

logger.info(f"Found PII in {pii_count} responses")

return unique

def balance(self, examples):
"""Balance by class."""
logger.info("Balancing...")

from collections import defaultdict
import random

by_class = defaultdict(list)
for ex in examples:
class_label = ex.get("category", "unknown")
by_class[class_label].append(ex)

max_count = max(len(v) for v in by_class.values())
balanced = []

for class_label, class_examples in by_class.items():
if len(class_examples) < max_count:
class_examples.extend(
random.choices(class_examples, k=max_count - len(class_examples))
)
balanced.extend(class_examples)

logger.info(f"Balanced dataset: {len(examples)} -> {len(balanced)}")
return balanced

def split(self, examples, train_ratio=0.7, val_ratio=0.15):
"""Split into train/val/test."""
logger.info("Splitting...")

import random
random.shuffle(examples)

n = len(examples)
train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)

train = examples[:train_end]
val = examples[train_end:val_end]
test = examples[val_end:]

return train, val, test

def save(self, train, val, test):
"""Save split datasets."""
logger.info("Saving...")

for name, data in [("train", train), ("val", val), ("test", test)]:
path = self.output_dir / f"{name}.jsonl"
with open(path, "w") as f:
for ex in data:
f.write(json.dumps(ex) + "\n")
logger.info(f"Saved {len(data)} examples to {path}")

def run(self):
"""Execute full pipeline."""
logger.info("Starting pipeline...")

examples = self.load_raw()
examples = self.clean(examples)
examples = self.balance(examples)
train, val, test = self.split(examples)
self.save(train, val, test)

logger.info("Pipeline complete!")

# Usage
pipeline = DatasetPipeline(input_dir="raw_data", output_dir="processed_data")
pipeline.run()

Cloud storage integration (AWS S3):

import boto3

class S3Pipeline(DatasetPipeline):
def __init__(self, input_dir, output_dir, s3_bucket, aws_profile=None):
super().__init__(input_dir, output_dir)

session = boto3.Session(profile_name=aws_profile)
self.s3 = session.client("s3")
self.bucket = s3_bucket

def upload_to_s3(self, file_path, s3_key):
"""Upload processed data to S3."""
logger.info(f"Uploading {file_path} to s3://{self.bucket}/{s3_key}")
self.s3.upload_file(str(file_path), self.bucket, s3_key)

def download_from_s3(self, s3_key, local_path):
"""Download raw data from S3."""
logger.info(f"Downloading s3://{self.bucket}/{s3_key} to {local_path}")
self.s3.download_file(self.bucket, s3_key, local_path)

def run(self):
"""Execute pipeline and upload results."""
super().run()

# Upload processed files to S3
for file_type in ["train", "val", "test"]:
local_path = self.output_dir / f"{file_type}.jsonl"
s3_key = f"datasets/{file_type}.jsonl"
self.upload_to_s3(local_path, s3_key)

# Usage
pipeline = S3Pipeline(
input_dir="raw_data",
output_dir="processed_data",
s3_bucket="my-ml-datasets",
aws_profile="default"
)
pipeline.run()

Architecture 2: dbt + SQL for Large Datasets

For very large datasets (> 100K examples) stored in a data warehouse, use dbt (data build tool) to orchestrate transformations:

-- dbt models/raw/raw_support_tickets.sql
-- Pull raw data from source

SELECT
ticket_id,
customer_message AS instruction,
agent_response AS response,
created_at,
customer_rating
FROM {{ source('support', 'tickets') }}
WHERE created_at >= '2025-01-01'
-- dbt models/staging/stg_support_tickets.sql
-- Clean and format

{{ config(
materialized='table',
indexes=[{'columns': ['ticket_id'], 'type': 'hash'}]
) }}

WITH raw AS (
SELECT * FROM {{ ref('raw_support_tickets') }}
),

cleaned AS (
SELECT
ticket_id,
TRIM(instruction) AS instruction,
TRIM(response) AS response,
created_at,
customer_rating,
ROW_NUMBER() OVER (PARTITION BY instruction, response ORDER BY created_at) as rn
FROM raw
WHERE LENGTH(instruction) > 5
AND LENGTH(response) > 5
AND customer_rating >= 4
)

SELECT * FROM cleaned
WHERE rn = 1 -- Remove exact duplicates
-- dbt models/mart/finetuning_dataset.sql
-- Final dataset for fine-tuning

{{ config(materialized='view') }}

WITH staging AS (
SELECT * FROM {{ ref('stg_support_tickets') }}
),

anonymized AS (
SELECT
ticket_id,
REGEXP_REPLACE(instruction, r'[a-zA-Z][a-zA-Z0-9._%+-]*@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]') AS instruction,
REGEXP_REPLACE(response, r'[a-zA-Z][a-zA-Z0-9._%+-]*@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]') AS response,
created_at
FROM staging
),

balanced AS (
SELECT
*,
ROW_NUMBER() OVER (ORDER BY RAND()) as sample_id
FROM anonymized
WHERE sample_id <= 10000 -- Limit to 10K examples
)

SELECT * FROM balanced

Run dbt pipeline:

dbt deps  # Install dependencies
dbt run # Execute all models
dbt test # Run data tests
dbt docs generate # Generate documentation

Architecture 3: Apache Airflow for Complex Workflows

For teams with complex, multi-step workflows, use Apache Airflow to orchestrate tasks:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3Operator
from datetime import datetime, timedelta

default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'dataset_preparation_pipeline',
default_args=default_args,
schedule_interval='@weekly', # Run weekly
start_date=datetime(2026, 1, 1),
)

def load_and_clean(**context):
"""Task 1: Load and clean data."""
from dataset_pipeline import DatasetPipeline
pipeline = DatasetPipeline(input_dir="s3://raw-data", output_dir="/tmp/cleaned")
pipeline.clean()
context['task_instance'].xcom_push(key='cleaned_path', value="/tmp/cleaned")

def balance_and_split(**context):
"""Task 2: Balance and split."""
cleaned_path = context['task_instance'].xcom_pull(task_ids='load_clean', key='cleaned_path')
pipeline = DatasetPipeline(input_dir=cleaned_path, output_dir="/tmp/split")
pipeline.balance()
pipeline.split()
context['task_instance'].xcom_push(key='split_path', value="/tmp/split")

def upload_to_s3(**context):
"""Task 3: Upload to S3."""
split_path = context['task_instance'].xcom_pull(task_ids='balance_split', key='split_path')
# Upload code here
pass

# Define DAG tasks
task_load_clean = PythonOperator(
task_id='load_clean',
python_callable=load_and_clean,
dag=dag,
)

task_balance_split = PythonOperator(
task_id='balance_split',
python_callable=balance_and_split,
dag=dag,
)

task_upload = PythonOperator(
task_id='upload',
python_callable=upload_to_s3,
dag=dag,
)

# Define dependencies
task_load_clean >> task_balance_split >> task_upload

Open-Source Tools for Automation

DVC (Data Version Control): Track dataset versions and reproduce pipelines.

# Initialize DVC
dvc init

# Track dataset
dvc add raw_data.jsonl

# Define pipeline (dvc.yaml)
# stages:
# clean:
# cmd: python clean.py
# deps:
# - raw_data.jsonl
# outs:
# - cleaned_data.jsonl

# Run pipeline
dvc repro

Label Studio: Open-source annotation platform with API integration.

import requests

# Create labeling task
task_data = {
"instruction": "Classify this text",
"text": "Customer feedback here..."
}

response = requests.post(
"http://label-studio:8080/api/tasks",
json={"data": task_data}
)

# Retrieve labeled results
labeled = requests.get("http://label-studio:8080/api/tasks/1/annotations")

Great Expectations: Data validation framework.

import great_expectations as ge

# Create expectation suite
suite = ge.create_expectation_suite("dataset_suite")

# Define expectations
suite.add_expectation(
ge.core.expect_table_row_count_to_be_between(min_value=100, max_value=10000)
)
suite.add_expectation(
ge.core.expect_column_values_to_not_be_null(column="instruction")
)

# Validate dataset
dataset = ge.read_json("dataset.jsonl")
results = dataset.validate(expectation_suite=suite)

print(results)

Best Practices for Scaling

1. Idempotency: Pipelines should be safe to run multiple times. Use deterministic splitting and versioned data sources.

def idempotent_split(examples, seed=42, version=1):
"""Split deterministically based on seed and version."""
import hashlib

for ex in examples:
# Create stable hash per example
content = json.dumps(ex, sort_keys=True)
ex_hash = int(hashlib.md5(content.encode()).hexdigest(), 16)

# Use hash to decide split
split = (ex_hash + version) % 100
if split < 70:
yield "train", ex
elif split < 85:
yield "val", ex
else:
yield "test", ex

2. Versioning: Track dataset versions with metadata.

dataset_metadata = {
"version": "1.0.0",
"created_at": "2026-06-02T10:00:00Z",
"examples_count": 10000,
"train_size": 7000,
"val_size": 1500,
"test_size": 1500,
"sources": ["production_logs", "public_data", "synthetic"],
"pipeline_commit": "abc123def456",
"data_warehouse": "s3://ml-datasets/v1.0.0"
}

with open("dataset_metadata.json", "w") as f:
json.dump(dataset_metadata, f, indent=2)

3. Monitoring: Log metrics and alert on anomalies.

import logging

logger = logging.getLogger(__name__)

def monitor_dataset_health(examples):
"""Track dataset health metrics."""

total = len(examples)
empty_instructions = sum(1 for ex in examples if not ex.get("instruction"))
empty_responses = sum(1 for ex in examples if not ex.get("response"))

logger.info(f"Dataset health check:")
logger.info(f" Total examples: {total}")
logger.info(f" Empty instructions: {empty_instructions} ({100*empty_instructions/total:.2f}%)")
logger.info(f" Empty responses: {empty_responses} ({100*empty_responses/total:.2f}%)")

# Alert if anomaly detected
if empty_instructions / total > 0.01:
logger.warning("Too many empty instructions!")

return {
"total": total,
"empty_instructions": empty_instructions,
"empty_responses": empty_responses
}

Key Takeaways

  • Automate dataset preparation at scale: Python scripts for < 100K examples, dbt for SQL-based pipelines, Airflow for complex workflows.
  • Use cloud storage (S3, GCS) for versioning and reproducibility.
  • Implement idempotent pipelines: safe to run multiple times.
  • Track dataset versions and metadata.
  • Monitor pipeline health and alert on anomalies.
  • Open-source tools (DVC, Label Studio, Great Expectations) accelerate automation.

Frequently Asked Questions

Should I automate from the start, or wait until data scales?

Start simple (Python scripts), then graduate to dbt or Airflow as you hit 10K+ examples. Automation pays for itself when manual effort exceeds 2–3 hours.

How do I handle incremental datasets (new data added weekly)?

Use incremental loading:

SELECT * FROM raw_table
WHERE updated_at >= (SELECT MAX(updated_at) FROM processed_table)

This way, pipelines only process new/changed examples.

What if my pipeline fails mid-run?

Ensure idempotency: re-running should be safe. For Airflow, use task dependencies and sensors to check upstream completion before running.

How do I test pipelines before production?

Use a staging environment: run the full pipeline on a small subset first, validate output, then run on production data.

How do I track data lineage (which examples came from which source)?

Add a source field to each example:

{"instruction": "...", "response": "...", "source": "production_logs", "source_id": "ticket_123"}

Further Reading