Production ETL: Error Handling and Fault Tolerance
Production ETL pipelines fail: APIs go down, files are corrupted, databases fill up, network timeouts occur. A naive pipeline crashes and loses partial results; a production pipeline handles failures gracefully, retries intelligently, quarantines problematic data, and alerts operators before data freshness SLOs are violated. Designing fault-tolerant ETL requires multiple patterns: (1) circuit breakers (stop hammering failing APIs), (2) dead-letter queues (quarantine poison data), (3) graceful degradation (partial results are better than none), and (4) comprehensive testing (chaos testing to inject failures). According to a 2026 DataOps survey, 73% of pipeline failures are due to poor error handling; teams that implement systematic error handling reduce outages by 60%.
Circuit Breaker Pattern: Preventing Cascading Failures
A circuit breaker stops calling a failing service, preventing cascading failures. It has three states: Closed (normal), Open (failing, stop calling), Half-Open (testing recovery).
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Service failing, stop calling
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Prevent cascading failures by stopping calls to failing services."""
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Call a function, managing circuit state."""
if self.state == CircuitState.OPEN:
# Check if we should transition to HALF_OPEN
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = CircuitState.HALF_OPEN
print(f"🔶 Circuit HALF_OPEN, testing recovery...")
else:
# Still open, reject the call
raise Exception(f"Circuit breaker is OPEN for {func.__name__}")
try:
result = func(*args, **kwargs)
# Success, reset failure count
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
print(f"🟢 Circuit CLOSED, recovered")
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"🔴 Circuit OPEN after {self.failure_count} failures")
raise
# Usage: protect API calls
breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30)
def fetch_api_data(url: str):
"""Fetch with circuit breaker protection."""
def _fetch():
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
try:
return breaker.call(_fetch)
except Exception as e:
print(f"Circuit breaker rejected call: {e}")
return None # Return cached/stale data instead of crashing
When the circuit opens, return stale data (cache), skip the step, or alert an operator instead of crashing the pipeline.
Dead-Letter Queue Pattern: Quarantine Poison Data
Data that can't be processed (corrupt JSON, invalid schema, oversized files) is quarantined to a "dead-letter queue" for manual inspection, preventing poisoned records from cascading through the pipeline.
import json
from pathlib import Path
from datetime import datetime
class DeadLetterQueue:
"""Quarantine records that fail processing."""
def __init__(self, dlq_dir: str = "./dead_letter_queue"):
self.dlq_dir = Path(dlq_dir)
self.dlq_dir.mkdir(exist_ok=True)
def send_to_dlq(self, record: dict, pipeline_stage: str, error: Exception):
"""Store a failed record with error details."""
dlq_entry = {
"timestamp": datetime.utcnow().isoformat(),
"pipeline_stage": pipeline_stage,
"error_type": type(error).__name__,
"error_message": str(error),
"record": record
}
# Save to file (use record ID as filename if available)
filename = f"{record.get('id', 'unknown')}_{datetime.utcnow().timestamp()}.json"
filepath = self.dlq_dir / filename
with open(filepath, 'w') as f:
json.dump(dlq_entry, f, indent=2)
print(f"📦 Sent to DLQ: {filepath}")
def process_dlq(self, max_records: int = 100):
"""Review and retry DLQ records (manual/semi-automated)."""
dlq_files = sorted(self.dlq_dir.glob("*.json"))[:max_records]
print(f"📋 Found {len(dlq_files)} DLQ entries:")
for filepath in dlq_files:
with open(filepath) as f:
entry = json.load(f)
print(f" - {filepath.name}")
print(f" Stage: {entry['pipeline_stage']}")
print(f" Error: {entry['error_message']}")
# Usage in ETL pipeline
dlq = DeadLetterQueue()
def transform_with_dlq(record: dict) -> dict or None:
"""Transform with DLQ fallback."""
try:
# Attempt transformation
if 'content' not in record:
raise ValueError("Missing 'content' field")
# Clean and process
cleaned = record['content'].strip()
return {"id": record['id'], "content": cleaned}
except Exception as e:
# Send to DLQ instead of crashing
dlq.send_to_dlq(record, pipeline_stage='transform', error=e)
return None # Skip this record, continue with next
Process the DLQ manually (or semi-automatically with a separate "retry" pipeline) to fix data issues.
Graceful Degradation: Partial Results > Total Failure
If extraction partially fails (API returns 500 after 30K records), load the 30K instead of rejecting all:
def extract_with_graceful_degradation(api_url: str, max_attempts: int = 3):
"""Extract with graceful degradation on failure."""
all_records = []
cursor = None
attempt = 0
while attempt < max_attempts:
try:
response = requests.get(
api_url,
params={'cursor': cursor, 'limit': 1000},
timeout=30
)
response.raise_for_status()
data = response.json()
records = data.get('data', [])
all_records.extend(records)
cursor = data.get('pagination', {}).get('next_cursor')
if not cursor or len(records) == 0:
# Successfully fetched all data
print(f"✓ Extraction complete: {len(all_records)} records")
return all_records
attempt = 0 # Reset attempts on success
except requests.exceptions.Timeout:
attempt += 1
print(f"Timeout, attempt {attempt}/{max_attempts}, returning partial results...")
if attempt >= max_attempts and all_records:
# Return partial results instead of failing completely
print(f"⚠ Returning {len(all_records)} partial records (extraction interrupted)")
return all_records
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"Extraction failed: {e}")
if all_records:
print(f"⚠ Returning {len(all_records)} partial records")
return all_records
else:
raise
# Usage
records = extract_with_graceful_degradation("https://api.example.com/documents")
load_to_database(records)
# Log that we have partial data
if len(records) < expected_count:
send_alert(f"Partial extraction: {len(records)}/{expected_count} records")
Comprehensive Error Categorization
Distinguish retryable (transient) from permanent errors:
class RetryableError(Exception):
"""Transient error, safe to retry."""
pass
class PermanentError(Exception):
"""Permanent error, retry won't help."""
pass
def categorize_error(error: Exception) -> str:
"""Classify error for retry strategy."""
error_str = str(error)
# Retryable: network, timeout, rate limit, temporary resource shortage
retryable_patterns = [
"Connection refused",
"Timeout",
"429", # Too Many Requests
"503", # Service Unavailable
"temporarily unavailable",
"ECONNRESET",
"EAGAIN"
]
for pattern in retryable_patterns:
if pattern in error_str:
raise RetryableError(error_str)
# Permanent: auth, not found, invalid, schema mismatch
permanent_patterns = [
"Unauthorized",
"401",
"403",
"404",
"Not Found",
"Invalid",
"schema violation",
"Forbidden"
]
for pattern in permanent_patterns:
if pattern in error_str:
raise PermanentError(error_str)
# Unknown: log and skip
print(f"⚠ Unknown error, treating as permanent: {error_str}")
raise PermanentError(error_str)
def extract_with_smart_retry(api_url: str, max_retries: int = 5):
"""Extract with smart retry (only retries transient errors)."""
for attempt in range(max_retries):
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
try:
categorize_error(e)
# If no exception, it's retryable
wait_time = (2 ** attempt) * 5
print(f"Transient error, retrying in {wait_time}s...")
time.sleep(wait_time)
except PermanentError:
print(f"Permanent error, not retrying: {e}")
raise
Testing ETL Error Handling: Chaos Testing
Test error handling with injected failures:
import random
from unittest.mock import patch, MagicMock
def test_etl_with_injected_failures():
"""Test ETL resilience by injecting failures."""
# Simulate API returning 500 on second call
call_count = 0
def mock_api(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 2:
raise Exception("API returned 500")
return {"data": [{"id": 1, "content": "test"}]}
with patch('requests.get', side_effect=mock_api):
records = extract_with_graceful_degradation("https://api.example.com/documents")
assert len(records) > 0, "Should return partial results despite error"
print("✓ Test passed: graceful degradation works")
def test_circuit_breaker():
"""Test circuit breaker transitions."""
breaker = CircuitBreaker(failure_threshold=2, reset_timeout=1)
# Cause failures
def failing_func():
raise Exception("Service down")
for _ in range(2):
try:
breaker.call(failing_func)
except:
pass
# Circuit should now be open
assert breaker.state == CircuitState.OPEN
# Reject new calls
try:
breaker.call(failing_func)
assert False, "Should reject call"
except Exception as e:
assert "OPEN" in str(e)
# Wait for reset timeout
time.sleep(1.1)
# Circuit transitions to HALF_OPEN on next call
def working_func():
return "success"
result = breaker.call(working_func)
assert result == "success"
assert breaker.state == CircuitState.CLOSED
print("✓ Test passed: circuit breaker works")
# Run tests
test_etl_with_injected_failures()
test_circuit_breaker()
Key Takeaways
- Use circuit breakers to stop calling failing services, preventing cascading failures.
- Implement dead-letter queues to quarantine poison data, allowing pipeline to continue.
- Practice graceful degradation: partial results beat total failure.
- Categorize errors (retryable vs. permanent) to avoid wasting retries on impossible cases.
- Test error handling with chaos injection; test circuit breakers, timeouts, and recovery.
Frequently Asked Questions
How many retries is reasonable?
For transient errors (timeouts, 503): 3–5 retries. For rate limits (429): 1–2 retries with long backoff. For permanent errors (404, auth): 0 retries. Use exponential backoff: 1s, 2s, 4s, 8s, 16s.
Should I retry immediately or wait?
Wait. Use exponential backoff to give the source system time to recover. Immediate retry (hammering) makes things worse. Minimum wait: 1 second. Maximum wait: 5–15 minutes for severe outages.
What if the source API has no rate-limit headers?
Implement conservative rate limiting: 1 request per second, or even 1 per 10 seconds. Use a token bucket or sliding window to enforce it. Better to be slow than get rate-limited and blocked.
How do I clean up old DLQ entries?
Archive DLQ entries older than 30 days to cold storage (S3 Glacier). Keep recent entries (30 days) for quick access. Set a cron job to periodically clean up and report on DLQ health (how many entries, top error types).
Should I pause the entire pipeline if one stage fails?
No. Use partial results if possible. If a component is down, skip it (degrade), log, and alert. Example: if embedding API is down, ingest documents without embeddings; embeddings can be added later.