Incremental Sync: Avoiding Full ETL Reloads
Incremental sync is the practice of extracting only new and modified data since the last successful run, then loading it into the target without duplicating existing records. A full reload extracts all 5 million documents daily; incremental sync extracts only the 1,000 new documents, reducing pipeline runtime from 8 hours to 5 minutes. Incremental sync requires three elements: (1) a checkpoint (tracking last successful state), (2) duplicate detection (avoiding inserting the same record twice), and (3) idempotent loading (safe to retry the same batch without side effects). Without these, pipelines either waste compute reprocessing data or silently create duplicates, corrupting your target dataset. Gartner estimates that poor incremental sync strategies cost enterprises 40% of their ETL infrastructure budget in wasted computation.
Checkpoint Management: The Foundation of Incremental Sync
A checkpoint is a saved state marking "we successfully processed data up to this point." Checkpoints live in a durable location (database, S3, or file system) and survive pipeline restarts. The checkpoint answers: "What was the last record we successfully ingested?"
Checkpoint types:
- Timestamp checkpoint: Last record was updated at
2026-06-02T14:30:00Z. - ID checkpoint: Last record had
id = 50000. - Offset checkpoint: Last record was at offset 50000 in a list.
- Cursor checkpoint: Last record had
cursor = abc123def456.
Here is a robust checkpoint manager for unstructured ETL:
import json
import sqlite3
from datetime import datetime
from typing import Dict, Any
class CheckpointManager:
"""Manage incremental sync checkpoints durably."""
def __init__(self, db_path: str = "etl_checkpoints.db"):
self.db = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.db.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
pipeline_id TEXT PRIMARY KEY,
last_id TEXT,
last_timestamp TEXT,
last_cursor TEXT,
record_count INTEGER,
run_duration_seconds REAL,
status TEXT,
updated_at TIMESTAMP
)
""")
self.db.commit()
def load(self, pipeline_id: str) -> Dict[str, Any]:
"""Load the last checkpoint for a pipeline."""
cursor = self.db.cursor()
result = cursor.execute(
"SELECT last_id, last_timestamp, last_cursor, record_count, status FROM checkpoints WHERE pipeline_id = ?",
(pipeline_id,)
).fetchone()
if result:
last_id, last_timestamp, last_cursor, record_count, status = result
return {
"last_id": last_id,
"last_timestamp": last_timestamp,
"last_cursor": last_cursor,
"record_count": record_count,
"status": status
}
else:
# First run: start from the beginning
return {
"last_id": None,
"last_timestamp": None,
"last_cursor": None,
"record_count": 0,
"status": "new"
}
def save(self, pipeline_id: str, checkpoint: Dict[str, Any], duration_seconds: float):
"""Save a checkpoint after successful run."""
self.db.execute("""
INSERT OR REPLACE INTO checkpoints
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
pipeline_id,
checkpoint.get("last_id"),
checkpoint.get("last_timestamp"),
checkpoint.get("last_cursor"),
checkpoint.get("record_count", 0),
duration_seconds,
"success",
datetime.utcnow().isoformat()
))
self.db.commit()
def mark_failed(self, pipeline_id: str):
"""Mark a run as failed without updating checkpoint."""
cursor = self.db.cursor()
cursor.execute(
"UPDATE checkpoints SET status = ? WHERE pipeline_id = ?",
("failed", pipeline_id)
)
self.db.commit()
# Usage
manager = CheckpointManager()
checkpoint = manager.load("documents_etl")
if checkpoint["status"] == "success":
print(f"Resuming from last_timestamp: {checkpoint['last_timestamp']}")
# Query API with updated_after = checkpoint['last_timestamp']
else:
print("First run, starting from beginning")
Critical principle: only update the checkpoint after successful processing of a batch. If extraction, transformation, or loading fails at any point, don't update the checkpoint; the next run will retry.
Duplicate Detection: Idempotency Keys
Duplicate detection prevents inserting the same record twice. Use an idempotency key (a unique identifier per record) to detect re-ingested data. Most records have a natural key: document ID, email address, social media post ID.
Here is a pattern using SQLite to track seen idempotency keys:
import hashlib
class DuplicateDetector:
"""Track ingested records to prevent duplicates."""
def __init__(self, db_path: str = "etl_duplicates.db"):
self.db = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.db.execute("""
CREATE TABLE IF NOT EXISTS seen_records (
idempotency_key TEXT PRIMARY KEY,
first_seen TIMESTAMP,
record_hash TEXT
)
""")
self.db.commit()
def compute_record_hash(self, record: Dict[str, Any]) -> str:
"""Compute a hash of record content for change detection."""
content = json.dumps(record, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def is_duplicate(self, idempotency_key: str, record: Dict[str, Any]) -> bool:
"""Check if a record was already ingested."""
new_hash = self.compute_record_hash(record)
cursor = self.db.cursor()
result = cursor.execute(
"SELECT record_hash FROM seen_records WHERE idempotency_key = ?",
(idempotency_key,)
).fetchone()
if not result:
# New record, not a duplicate
return False
old_hash = result[0]
if new_hash == old_hash:
# Same content, definitely a duplicate
return True
else:
# Content changed, this is an update, not a duplicate
return False
def mark_seen(self, idempotency_key: str, record: Dict[str, Any]):
"""Record that we've seen this record."""
record_hash = self.compute_record_hash(record)
self.db.execute(
"INSERT OR REPLACE INTO seen_records VALUES (?, ?, ?)",
(idempotency_key, datetime.utcnow().isoformat(), record_hash)
)
self.db.commit()
# Usage in ETL pipeline
detector = DuplicateDetector()
manager = CheckpointManager()
checkpoint = manager.load("documents_etl")
records = fetch_api_records(since_timestamp=checkpoint["last_timestamp"])
new_records = []
for record in records:
idempotency_key = record["id"]
if detector.is_duplicate(idempotency_key, record):
print(f"Skipping duplicate: {idempotency_key}")
continue
new_records.append(record)
detector.mark_seen(idempotency_key, record)
print(f"Loading {len(new_records)} new/updated records")
load_to_database(new_records)
# Only update checkpoint if all loads succeed
manager.save("documents_etl", {
"last_id": records[-1]["id"],
"last_timestamp": records[-1]["updated_at"],
"record_count": len(new_records)
}, duration_seconds=elapsed_time)
This approach detects three cases:
- New record: First time seeing this idempotency key → insert.
- Duplicate: Same idempotency key + same content hash → skip.
- Update: Same idempotency key + different content hash → update (or delete-and-insert).
Idempotent Loading: Making Retries Safe
Idempotent loading means running the same load operation twice produces the same result. Without idempotence, a network error during load causes you to either skip the batch (losing data) or insert duplicates (corrupting data).
Pattern: Use INSERT OR REPLACE (upsert) for database loads:
def load_to_database_idempotent(records: List[Dict], table: str, connection):
"""Load records using upsert for idempotence."""
cursor = connection.cursor()
for record in records:
# Upsert: if idempotency_key exists, update; else insert
cursor.execute(f"""
INSERT OR REPLACE INTO {table}
(id, content, updated_at)
VALUES (?, ?, ?)
""", (
record["id"],
record["content"],
datetime.utcnow().isoformat()
))
connection.commit()
print(f"✓ Loaded {len(records)} records idempotently")
For vector databases (Pinecone, Weaviate), use upsert endpoints which replace existing embeddings:
import pinecone
index = pinecone.Index("documents")
vectors_to_upsert = [
("doc_123", [0.1, 0.2, 0.3, ...], {"source": "pdf_1.pdf"}),
("doc_124", [0.2, 0.3, 0.4, ...], {"source": "pdf_2.pdf"}),
]
# Upsert is idempotent: running twice produces same result
index.upsert(vectors_to_upsert)
Handling Failed Checkpoints and Partial Runs
If a pipeline fails mid-run (e.g., after processing 500K of 1M records), you need a strategy to recover:
- Resume from last successful checkpoint: The next run retries from
last_timestamp, potentially reprocessing some records (the detector catches duplicates). - Implement watermarking: Track the "watermark" (highest ID processed, even if not saved) separately from the checkpoint. On failure, resume from watermark, not checkpoint.
- Idempotent batch processing: Load data in small batches (1,000 records) with checkpoints between batches. If batch 10 fails, retry batch 10 without reprocessing batches 1–9.
Here is a batched, resilient pattern:
def etl_with_batched_checkpoints(source_endpoint: str, batch_size: int = 1000):
"""ETL with checkpoints between batches for resilience."""
manager = CheckpointManager()
checkpoint = manager.load("documents_etl")
all_records = fetch_api_records(since_timestamp=checkpoint["last_timestamp"])
# Process in batches with checkpoints
for batch_num, i in enumerate(range(0, len(all_records), batch_size)):
batch = all_records[i:i + batch_size]
try:
# Transform and load this batch
transformed = [transform(r) for r in batch]
load_to_database(transformed)
# Checkpoint after successful batch
if batch:
manager.save("documents_etl", {
"last_id": batch[-1]["id"],
"last_timestamp": batch[-1]["updated_at"],
"record_count": len(all_records)
}, duration_seconds=0)
print(f"✓ Batch {batch_num}: checkpoint saved")
except Exception as e:
manager.mark_failed("documents_etl")
print(f"✗ Batch {batch_num} failed: {e}")
raise
etl_with_batched_checkpoints("https://api.example.com/documents")
Key Takeaways
- Checkpoints track progress so pipelines resume without full reloads.
- Duplicate detection using idempotency keys prevents inserting the same record twice.
- Idempotent loading (upserts) makes pipelines safe to retry; running twice = running once.
- Batch processing with per-batch checkpoints recovers gracefully from mid-run failures.
- Only update checkpoints after fully successful processing; on failure, leave checkpoint untouched.
Frequently Asked Questions
What if I don't have a natural idempotency key?
Create a synthetic one from the record content: idempotency_key = hash(source + content). This works for detecting exact duplicates but misses semantic duplicates (e.g., two articles with slightly different wording but same meaning). For sophisticated deduplication, use embeddings (next article).
Can I use file modification timestamps as a checkpoint?
Yes, for file-based ETL. Track the last modification timestamp of successfully processed files. On the next run, list files with modified_time > last_checkpoint. Beware: if a file is re-uploaded with the same content, the timestamp changes and triggers reprocessing (use content hashing to detect this).
Should I checkpoint per pipeline or per stage?
Checkpoint per stage (extractor, transformer, loader). This lets you retry failed stages without re-extracting from the source. Example: ETL crashes in the embedding stage; you retry embeddings without re-fetching from the API.
What if the source system deletes old records?
Use the "watermark" strategy: track both the checkpoint (last fully processed) and the watermark (highest ID seen). If the source deletes records, the watermark lets you detect gaps and log warnings. Consider archiving deleted records separately.
How do I test incremental sync logic?
Unit test the checkpoint, duplicate detector, and loader independently. Integration test with a small dataset, manually inject duplicates, and verify the pipeline skips them. Chaos test by killing the pipeline mid-run and verifying it resumes correctly.