Skip to main content

Change Data Capture (CDC) Explained: Tracking Modifications

Change Data Capture (CDC) is a technique that identifies and extracts only the data that has changed since the last ingestion, enabling efficient incremental synchronization without full reloads. Instead of extracting all 10 million customer records daily, CDC identifies the 50,000 records that were created or modified in the past 24 hours and syncs only those. This reduces compute cost, network bandwidth, and load on source systems. CDC is the backbone of real-time data pipelines: without it, syncing databases at scale is prohibitively expensive. According to a 2025 survey by Fivetran, 62% of enterprises lack a systematic CDC strategy, causing data staleness (average lag 18+ hours) and wasted infrastructure costs. CDC comes in three flavors: timestamp-based (easiest), query-based (common in APIs), and log-based (fastest but requires source access).

Why CDC Is Essential for Unstructured Data Pipelines

In a naive ETL pipeline without CDC, you run a full extraction every cycle: download all PDFs from S3, extract text, and load into the vector database. If you have 1 million PDFs and 1,000 are added daily, you waste 99.9% of your computation re-processing unchanged files. CDC detects that only 1,000 files are new (by checking modification timestamps) and processes only those, reducing daily compute cost by 100x.

For APIs, CDC uses the updated_after or since_id pattern: request only records modified after your last checkpoint. For databases, CDC reads transaction logs to identify inserted, updated, and deleted rows. For cloud storage, CDC lists objects and compares modification timestamps to your last run.

The benefit is compounding: as your dataset grows, the ratio of new data to total data shrinks, and CDC saves increasingly more work.

Timestamp-Based CDC (Simplest)

Timestamp-based CDC is the easiest to implement: track the maximum updated_at timestamp from your last run, then query for all records with updated_at > last_timestamp. This works for any source with a timestamp field: databases, APIs, cloud files.

Here is a production example:

import sqlite3
from datetime import datetime, timedelta
import requests
import json

class TimestampCDC:
"""Track incremental changes using modified timestamps."""

def __init__(self, checkpoint_db: str):
self.db = sqlite3.connect(checkpoint_db)
self._init_checkpoint_table()

def _init_checkpoint_table(self):
"""Create table to store last ingestion timestamp per source."""
self.db.execute("""
CREATE TABLE IF NOT EXISTS cdc_checkpoints (
source_name TEXT PRIMARY KEY,
last_sync_timestamp TEXT,
record_count INTEGER,
last_updated TIMESTAMP
)
""")
self.db.commit()

def get_last_checkpoint(self, source_name: str) -> str:
"""Retrieve the last sync timestamp for a source."""
cursor = self.db.cursor()
result = cursor.execute(
"SELECT last_sync_timestamp FROM cdc_checkpoints WHERE source_name = ?",
(source_name,)
).fetchone()

if result:
return result[0]
else:
# First run: default to 7 days ago
default_time = (datetime.utcnow() - timedelta(days=7)).isoformat()
return default_time

def sync_api_incremental(self, source_name: str, api_endpoint: str, api_key: str):
"""Fetch only records modified since last checkpoint."""
last_checkpoint = self.get_last_checkpoint(source_name)
print(f"Syncing {source_name} since {last_checkpoint}")

# Query API with updated_after filter
params = {
"updated_after": last_checkpoint,
"limit": 100,
"api_key": api_key
}

all_records = []
has_more = True
cursor = None

while has_more:
if cursor:
params["cursor"] = cursor

response = requests.get(api_endpoint, params=params, timeout=30)
response.raise_for_status()
data = response.json()

records = data.get("data", [])
all_records.extend(records)
print(f"✓ Fetched {len(records)} records")

cursor = data.get("pagination", {}).get("next_cursor")
has_more = cursor is not None and len(records) > 0

if all_records:
# Update checkpoint to current timestamp
max_updated_at = max(r.get("updated_at", last_checkpoint) for r in all_records)
self.db.execute(
"INSERT OR REPLACE INTO cdc_checkpoints VALUES (?, ?, ?, ?)",
(source_name, max_updated_at, len(all_records), datetime.utcnow().isoformat())
)
self.db.commit()
print(f"✓ Updated checkpoint for {source_name} to {max_updated_at}")

return all_records

# Usage
cdc = TimestampCDC("etl_checkpoints.db")
records = cdc.sync_api_incremental(
source_name="customer_api",
api_endpoint="https://api.example.com/v2/customers",
api_key="your_api_key"
)

print(f"Synced {len(records)} records")

This approach is reliable but has a limitation: if a record is updated but the updated_at timestamp doesn't change, CDC misses it. Also, clock skew (servers having different times) can cause missed records. Mitigate this by adding a grace period: use updated_after = last_checkpoint - 5 minutes.

Query-Based CDC (API Filtering)

Query-based CDC uses API filters to request only modified records. Many APIs support this natively:

  • Timestamp filter: GET /records?updated_after=2026-06-01T12:00:00Z
  • Sequence ID filter: GET /records?since_id=12345 (returns records with id > 12345)
  • Event streams: GET /events?type=created,updated&after_cursor=abc123

For databases without native CDC support, you can simulate it:

def sync_database_incremental(connection, table: str, checkpoint_file: str):
"""Sync only updated rows from a database."""

# Load checkpoint
try:
with open(checkpoint_file, "r") as f:
checkpoint = json.load(f)
last_id = checkpoint.get("last_id", 0)
last_updated_at = checkpoint.get("last_updated_at", "2000-01-01")
except FileNotFoundError:
last_id = 0
last_updated_at = "2000-01-01"

# Query only new and updated rows
cursor = connection.cursor()
query = f"""
SELECT * FROM {table}
WHERE id > ? OR (id = ? AND updated_at > ?)
ORDER BY id
LIMIT 10000
"""

cursor.execute(query, (last_id, last_id, last_updated_at))
rows = cursor.fetchall()

if not rows:
print(f"No changes in {table}")
return []

# Update checkpoint
max_row = rows[-1]
new_checkpoint = {
"last_id": max_row["id"],
"last_updated_at": max_row["updated_at"]
}

with open(checkpoint_file, "w") as f:
json.dump(new_checkpoint, f)

print(f"✓ Synced {len(rows)} changed rows")
return rows

Log-Based CDC (Most Reliable)

Log-based CDC reads the source database's transaction log (PostgreSQL WAL, MySQL binlog, MongoDB oplog) to capture every insert, update, and delete. This is the most reliable method but requires source system access and expertise.

Popular log-based CDC tools:

ToolSourceReal-timeCost
DebeziumPostgreSQL, MySQL, MongoDB, CassandraYes, event streamingOpen-source
AWS DMSAny AWS/RDS databaseYes, with Change TrackingManaged service
Fivetran300+ sources (SaaS)Yes, optimized per source$0.1–1 per 100K rows
StitchDatabases, APIs, cloud appsDepends on connector$100–1k/month

For production, consider Debezium (open-source, self-hosted) or a managed service (Fivetran, Stitch) to avoid operational burden.

Comparison: Timestamp vs. Query vs. Log-Based CDC

MethodLatencyReliabilityComplexityCost
Timestamp5–60 minGood (with grace period)LowMinimal
Query-based1–10 minGood (API-dependent)Low–MediumMinimal
Log-based<1 secExcellentHighMedium–High

Start with timestamp-based CDC for simplicity. Upgrade to log-based as latency requirements tighten.

Handling Deletes in CDC

Deletes are tricky: if a record is deleted at the source, your incremental query won't see it, and the old version remains in your target. Solutions:

  1. Soft deletes: Mark records as deleted = true instead of removing rows. Query WHERE deleted = false.
  2. Deletion markers: Include a deleted_at timestamp; any record with deleted_at > last_checkpoint is deleted.
  3. Full snapshots: Periodically (weekly) do a full sync to catch deletes.
  4. Event streams: Subscribe to delete events from the source (requires CDC infrastructure).

Key Takeaways

  • CDC detects only changed data since the last sync, reducing compute cost 10–100x for large datasets.
  • Timestamp-based CDC is simplest: track max(updated_at) and query updated_at > last_checkpoint.
  • Query-based CDC uses API filters like ?updated_after= or ?since_id=.
  • Log-based CDC reads transaction logs (PostgreSQL WAL, MySQL binlog) for real-time, reliable capture.
  • Handle deletes with soft deletes, deletion markers, or periodic full snapshots.

Frequently Asked Questions

What if the source system doesn't have an updated_at field?

Ask the data owner to add one — it's essential for CDC. If they refuse, implement a query that pulls all data, compute hashes of each record, and compare hashes to detect changes. This is expensive but workable for small datasets.

What happens if my checkpoint is lost or corrupted?

Rebuild it from scratch: re-sync the full dataset (set last_checkpoint to 7+ days ago, or to the beginning of time). This may be expensive but is safe. Store checkpoints in a durable system (database, not just files) and back up regularly.

How do I handle multiple transformations applied after CDC?

Track checkpoints per transformation stage, not globally. Example: API Extractor → CDC checkpoint → Parser → Parser checkpoint → Embeddings → Embeddings checkpoint. This lets you retry individual stages without re-extracting from the source.

Can CDC detect column-level changes or just row-level?

Standard CDC detects row-level changes: "row X was updated" but not "field Y changed from A to B". To track column-level changes, emit the before-and-after values in your extraction (e.g., {"id": 1, "name": "old_name", "updated_name": "new_name"}), then compare.

Is CDC suitable for real-time pipelines (latency <1 sec)?

Timestamp and query-based CDC have 1–5 minute latency due to polling overhead. Log-based CDC can achieve <1 second latency with proper infrastructure (Kafka event streams, Debezium connectors). For true real-time, use log-based CDC or message queues.

Further Reading