Monitoring ETL Pipeline Health and Data Freshness
Monitoring ETL pipelines is critical for production AI systems: if your embedding pipeline stalls, RAG models retrieve stale data; if data quality drops, models degrade silently. Yet 58% of data teams lack systematic monitoring (Gartner, 2026), discovering pipeline failures hours or days late. Effective ETL monitoring tracks four pillars: (1) pipeline health (job duration, failures, retry rates), (2) data freshness (lag between source and target), (3) data quality (schema compliance, null rates, distribution shifts), and (4) resource usage (compute cost, storage growth). This article teaches you to instrument pipelines, define SLOs (Service-Level Objectives), and set alerts that catch problems before they impact users.
Pipeline Health Metrics
Essential pipeline metrics:
| Metric | Why it matters | Alert threshold |
|---|---|---|
| Job duration | Increasing duration signals slowdown (API lag, data growth, resource issues). | >120% of baseline |
| Failure rate | >5% failures per week indicates systemic problems. | >1% of runs |
| Retry count | High retries signal flaky APIs or resource contention. | >3 retries per run |
| Freshness lag | Time between source update and target ingestion; aim for <1 hour for AI systems. | >2 hours |
| Record count | Anomalies (0 records, 10x normal) indicate upstream failures. | ±50% of baseline |
Here is a production monitoring setup using Prometheus (metrics) and Grafana (dashboards):
from prometheus_client import Counter, Gauge, Histogram
import time
class ETLMetrics:
"""Track ETL pipeline metrics for Prometheus."""
def __init__(self):
# Counters (monotonically increasing)
self.records_extracted = Counter(
'etl_records_extracted_total',
'Total records extracted',
['source']
)
self.records_loaded = Counter(
'etl_records_loaded_total',
'Total records loaded',
['target']
)
self.extraction_errors = Counter(
'etl_extraction_errors_total',
'Extraction errors',
['source', 'error_type']
)
# Gauges (can go up or down)
self.pipeline_lag_seconds = Gauge(
'etl_pipeline_lag_seconds',
'Seconds behind source',
['pipeline_id']
)
self.pending_records = Gauge(
'etl_pending_records',
'Records waiting to be processed',
['source']
)
# Histograms (distribution of values)
self.extraction_duration_seconds = Histogram(
'etl_extraction_duration_seconds',
'Time to extract records',
['source'],
buckets=[10, 30, 60, 300, 900]
)
self.transformation_duration_seconds = Histogram(
'etl_transformation_duration_seconds',
'Time to transform records',
buckets=[5, 15, 60, 300]
)
def record_extraction(self, source: str, record_count: int, duration_seconds: float):
"""Record extraction metrics."""
self.records_extracted.labels(source=source).inc(record_count)
self.extraction_duration_seconds.labels(source=source).observe(duration_seconds)
def record_error(self, source: str, error_type: str):
"""Record errors."""
self.extraction_errors.labels(source=source, error_type=error_type).inc()
def record_lag(self, pipeline_id: str, lag_seconds: float):
"""Record pipeline lag (time since last successful run)."""
self.pipeline_lag_seconds.labels(pipeline_id=pipeline_id).set(lag_seconds)
# Usage in ETL pipeline
metrics = ETLMetrics()
def extract_documents(source: str):
"""Extract with metrics."""
start = time.time()
try:
documents = fetch_documents(source)
duration = time.time() - start
metrics.record_extraction(source, len(documents), duration)
return documents
except Exception as e:
metrics.record_error(source, type(e).__name__)
raise
Data Freshness SLOs
Define Service-Level Objectives for data freshness:
from datetime import datetime, timedelta
class DataFreshnessSLO:
"""Track data freshness SLOs."""
def __init__(self, pipeline_id: str, max_lag_seconds: int):
self.pipeline_id = pipeline_id
self.max_lag_seconds = max_lag_seconds
self.last_successful_run = None
def check_freshness(self) -> dict:
"""Check if data meets freshness SLO."""
now = datetime.utcnow()
if not self.last_successful_run:
return {
"status": "unknown",
"lag_seconds": None,
"slo_met": False
}
lag = (now - self.last_successful_run).total_seconds()
slo_met = lag <= self.max_lag_seconds
return {
"status": "healthy" if slo_met else "stale",
"lag_seconds": lag,
"slo_met": slo_met,
"max_lag_seconds": self.max_lag_seconds
}
def record_success(self, run_time: datetime = None):
"""Record successful pipeline run."""
self.last_successful_run = run_time or datetime.utcnow()
# Usage
slo = DataFreshnessSLO(
pipeline_id="embedding_pipeline",
max_lag_seconds=3600 # Must refresh at least hourly
)
# In Airflow DAG
def on_success_callback(context):
slo.record_success()
freshness_check = slo.check_freshness()
print(f"Freshness status: {freshness_check}")
# Send alert if SLO violated
if not freshness_check["slo_met"]:
send_alert(
f"Pipeline {pipeline_id} is stale: "
f"{freshness_check['lag_seconds']}s lag (max {freshness_check['max_lag_seconds']}s)"
)
Data Quality Monitoring
Detect data quality regressions using statistical baselines:
import numpy as np
from scipy import stats
class DataQualityMonitor:
"""Detect data quality anomalies."""
def __init__(self, history_file: str = "quality_history.json"):
self.history_file = history_file
self.baselines = self._load_baselines()
def _load_baselines(self) -> dict:
"""Load historical quality metrics."""
try:
with open(self.history_file) as f:
return json.load(f)
except FileNotFoundError:
return {}
def compute_quality_metrics(self, records: list) -> dict:
"""Compute data quality metrics for a batch."""
if not records:
return {"status": "empty"}
# Null rate
total_fields = sum(len(r) for r in records)
null_count = sum(
sum(1 for v in r.values() if v is None or v == "")
for r in records
)
null_rate = null_count / total_fields if total_fields > 0 else 0
# Field coverage (expected fields present)
expected_fields = set(records[0].keys())
coverage = []
for record in records:
field_coverage = len(set(record.keys()) & expected_fields) / len(expected_fields)
coverage.append(field_coverage)
mean_coverage = np.mean(coverage)
# Schema compliance
schema_violations = []
for i, record in enumerate(records):
if set(record.keys()) != expected_fields:
schema_violations.append(i)
violation_rate = len(schema_violations) / len(records)
metrics = {
"record_count": len(records),
"null_rate": null_rate,
"field_coverage": mean_coverage,
"schema_violations": violation_rate
}
return metrics
def check_anomalies(self, pipeline_id: str, metrics: dict) -> list:
"""Detect anomalies using z-score."""
anomalies = []
baseline = self.baselines.get(pipeline_id, {})
for metric, value in metrics.items():
if metric not in baseline:
# First run, establish baseline
baseline[metric] = {"mean": value, "std": 0, "history": [value]}
else:
# Compute z-score
mean = baseline[metric]["mean"]
std = baseline[metric]["std"]
if std > 0:
z_score = abs((value - mean) / std)
if z_score > 3: # Anomaly if 3 standard deviations away
anomalies.append({
"metric": metric,
"value": value,
"baseline": mean,
"z_score": z_score
})
# Update baseline (exponential moving average)
baseline[metric]["mean"] = 0.9 * mean + 0.1 * value
baseline[metric]["history"].append(value)
self.baselines[pipeline_id] = baseline
self._save_baselines()
return anomalies
def _save_baselines(self):
with open(self.history_file, 'w') as f:
json.dump(self.baselines, f)
# Usage in ETL
monitor = DataQualityMonitor()
def monitor_data_quality(**context):
records = context['ti'].xcom_pull(task_ids='load')
metrics = monitor.compute_quality_metrics(records)
anomalies = monitor.check_anomalies('embedding_pipeline', metrics)
if anomalies:
for anomaly in anomalies:
print(f"⚠ Anomaly detected: {anomaly}")
send_alert(f"Data quality regression: {anomaly}")
return metrics
Key Monitoring Dashboards
Here is a Prometheus + Grafana dashboard definition (JSON):
{
"dashboard": {
"title": "ETL Pipeline Health",
"panels": [
{
"title": "Job Duration (minutes)",
"targets": [
{
"expr": "etl_extraction_duration_seconds{source='api'} / 60"
}
],
"type": "graph"
},
{
"title": "Record Count by Source",
"targets": [
{
"expr": "rate(etl_records_extracted_total[1h])"
}
]
},
{
"title": "Pipeline Lag (minutes)",
"targets": [
{
"expr": "etl_pipeline_lag_seconds / 60"
}
],
"alert": {
"condition": "lag > 60",
"message": "Pipeline lag exceeds 1 hour"
}
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(etl_extraction_errors_total[5m])"
}
]
}
]
}
}
Alerting Strategy
Alert intelligently to avoid "alert fatigue":
class AlertingStrategy:
"""Smart alerting to reduce noise."""
def __init__(self):
self.flaky_threshold = 3 # Allow up to 3 consecutive failures
self.consecutive_failures = 0
def should_alert(self, pipeline_status: str, error: Exception = None) -> bool:
"""Decide whether to send alert."""
if pipeline_status == "success":
self.consecutive_failures = 0
return False
elif pipeline_status == "failure":
self.consecutive_failures += 1
if self.consecutive_failures >= self.flaky_threshold:
# Alert after 3 consecutive failures, not on first failure
return True
return False
# Usage
alerting = AlertingStrategy()
def on_run_complete(pipeline_id: str, status: str, error: Exception = None):
should_alert = alerting.should_alert(status, error)
if should_alert:
send_alert(
f"Pipeline {pipeline_id} has failed {alerting.consecutive_failures} consecutive times. "
f"Error: {error}"
)
Key Takeaways
- Track four pillars: pipeline health (duration, failures), data freshness (lag), data quality (nulls, schema), and resource usage.
- Define SLOs (Service-Level Objectives) for freshness; alert when violated (e.g., data >2 hours stale).
- Monitor data quality using statistical baselines; detect anomalies (null rate, schema violations) via z-score.
- Use Prometheus for metrics collection and Grafana for dashboards.
- Implement smart alerting to reduce noise: alert after 3 failures, not on first transient failure.
Frequently Asked Questions
How often should I run ETL pipeline health checks?
Check every 15 minutes for production pipelines. Check after each pipeline run (that's instant via orchestrator callbacks). Set stricter SLOs for critical pipelines (hourly freshness) vs. batch pipelines (daily freshness OK).
What's a good target for pipeline SLO?
For AI systems, aim for <1 hour lag (re-train models hourly if data drifts). For analytics, <24 hours is typical. Define SLO based on business impact: if a 6-hour stale dataset materially hurts your product, set SLO to 2 hours.
How do I alert without spamming the team?
Implement exponential backoff: alert immediately on first failure, then only re-alert every 2 hours while the issue persists. Severity-based routing: critical (SLO violated) → page on-call; warning (high error rate) → Slack; info (slow run) → logs only.
Can I monitor multiple pipelines on one dashboard?
Yes, use Prometheus labels (e.g., pipeline_id="embedding", pipeline_id="chunking") and Grafana templating to show one dashboard per pipeline or aggregate across all.
How do I detect model degradation via ETL metrics?
Monitor data distribution shifts: track percentiles (min, p50, p95, max) of key fields over time. If distribution shifts >20%, alert. Also monitor embedding quality: sample embeddings, compute nearest-neighbor coherence (do similar texts group together?).