Skip to main content

Production ETL: Error Handling and Fault Tolerance

Production ETL pipelines fail: APIs go down, files are corrupted, databases fill up, network timeouts occur. A naive pipeline crashes and loses partial results; a production pipeline handles failures gracefully, retries intelligently, quarantines problematic data, and alerts operators before data freshness SLOs are violated. Designing fault-tolerant ETL requires multiple patterns: (1) circuit breakers (stop hammering failing APIs), (2) dead-letter queues (quarantine poison data), (3) graceful degradation (partial results are better than none), and (4) comprehensive testing (chaos testing to inject failures). According to a 2026 DataOps survey, 73% of pipeline failures are due to poor error handling; teams that implement systematic error handling reduce outages by 60%.

Circuit Breaker Pattern: Preventing Cascading Failures

A circuit breaker stops calling a failing service, preventing cascading failures. It has three states: Closed (normal), Open (failing, stop calling), Half-Open (testing recovery).

import time
from enum import Enum

class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Service failing, stop calling
HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
"""Prevent cascading failures by stopping calls to failing services."""

def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None

def call(self, func, *args, **kwargs):
"""Call a function, managing circuit state."""

if self.state == CircuitState.OPEN:
# Check if we should transition to HALF_OPEN
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = CircuitState.HALF_OPEN
print(f"🔶 Circuit HALF_OPEN, testing recovery...")
else:
# Still open, reject the call
raise Exception(f"Circuit breaker is OPEN for {func.__name__}")

try:
result = func(*args, **kwargs)

# Success, reset failure count
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
print(f"🟢 Circuit CLOSED, recovered")

return result

except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"🔴 Circuit OPEN after {self.failure_count} failures")

raise

# Usage: protect API calls
breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30)

def fetch_api_data(url: str):
"""Fetch with circuit breaker protection."""
def _fetch():
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()

try:
return breaker.call(_fetch)
except Exception as e:
print(f"Circuit breaker rejected call: {e}")
return None # Return cached/stale data instead of crashing

When the circuit opens, return stale data (cache), skip the step, or alert an operator instead of crashing the pipeline.

Dead-Letter Queue Pattern: Quarantine Poison Data

Data that can't be processed (corrupt JSON, invalid schema, oversized files) is quarantined to a "dead-letter queue" for manual inspection, preventing poisoned records from cascading through the pipeline.

import json
from pathlib import Path
from datetime import datetime

class DeadLetterQueue:
"""Quarantine records that fail processing."""

def __init__(self, dlq_dir: str = "./dead_letter_queue"):
self.dlq_dir = Path(dlq_dir)
self.dlq_dir.mkdir(exist_ok=True)

def send_to_dlq(self, record: dict, pipeline_stage: str, error: Exception):
"""Store a failed record with error details."""

dlq_entry = {
"timestamp": datetime.utcnow().isoformat(),
"pipeline_stage": pipeline_stage,
"error_type": type(error).__name__,
"error_message": str(error),
"record": record
}

# Save to file (use record ID as filename if available)
filename = f"{record.get('id', 'unknown')}_{datetime.utcnow().timestamp()}.json"
filepath = self.dlq_dir / filename

with open(filepath, 'w') as f:
json.dump(dlq_entry, f, indent=2)

print(f"📦 Sent to DLQ: {filepath}")

def process_dlq(self, max_records: int = 100):
"""Review and retry DLQ records (manual/semi-automated)."""
dlq_files = sorted(self.dlq_dir.glob("*.json"))[:max_records]

print(f"📋 Found {len(dlq_files)} DLQ entries:")

for filepath in dlq_files:
with open(filepath) as f:
entry = json.load(f)

print(f" - {filepath.name}")
print(f" Stage: {entry['pipeline_stage']}")
print(f" Error: {entry['error_message']}")

# Usage in ETL pipeline
dlq = DeadLetterQueue()

def transform_with_dlq(record: dict) -> dict or None:
"""Transform with DLQ fallback."""
try:
# Attempt transformation
if 'content' not in record:
raise ValueError("Missing 'content' field")

# Clean and process
cleaned = record['content'].strip()
return {"id": record['id'], "content": cleaned}

except Exception as e:
# Send to DLQ instead of crashing
dlq.send_to_dlq(record, pipeline_stage='transform', error=e)
return None # Skip this record, continue with next

Process the DLQ manually (or semi-automatically with a separate "retry" pipeline) to fix data issues.

Graceful Degradation: Partial Results > Total Failure

If extraction partially fails (API returns 500 after 30K records), load the 30K instead of rejecting all:

def extract_with_graceful_degradation(api_url: str, max_attempts: int = 3):
"""Extract with graceful degradation on failure."""

all_records = []
cursor = None
attempt = 0

while attempt < max_attempts:
try:
response = requests.get(
api_url,
params={'cursor': cursor, 'limit': 1000},
timeout=30
)
response.raise_for_status()

data = response.json()
records = data.get('data', [])
all_records.extend(records)

cursor = data.get('pagination', {}).get('next_cursor')
if not cursor or len(records) == 0:
# Successfully fetched all data
print(f"✓ Extraction complete: {len(all_records)} records")
return all_records

attempt = 0 # Reset attempts on success

except requests.exceptions.Timeout:
attempt += 1
print(f"Timeout, attempt {attempt}/{max_attempts}, returning partial results...")

if attempt >= max_attempts and all_records:
# Return partial results instead of failing completely
print(f"⚠ Returning {len(all_records)} partial records (extraction interrupted)")
return all_records

time.sleep(2 ** attempt) # Exponential backoff

except Exception as e:
print(f"Extraction failed: {e}")
if all_records:
print(f"⚠ Returning {len(all_records)} partial records")
return all_records
else:
raise

# Usage
records = extract_with_graceful_degradation("https://api.example.com/documents")
load_to_database(records)

# Log that we have partial data
if len(records) < expected_count:
send_alert(f"Partial extraction: {len(records)}/{expected_count} records")

Comprehensive Error Categorization

Distinguish retryable (transient) from permanent errors:

class RetryableError(Exception):
"""Transient error, safe to retry."""
pass

class PermanentError(Exception):
"""Permanent error, retry won't help."""
pass

def categorize_error(error: Exception) -> str:
"""Classify error for retry strategy."""

error_str = str(error)

# Retryable: network, timeout, rate limit, temporary resource shortage
retryable_patterns = [
"Connection refused",
"Timeout",
"429", # Too Many Requests
"503", # Service Unavailable
"temporarily unavailable",
"ECONNRESET",
"EAGAIN"
]

for pattern in retryable_patterns:
if pattern in error_str:
raise RetryableError(error_str)

# Permanent: auth, not found, invalid, schema mismatch
permanent_patterns = [
"Unauthorized",
"401",
"403",
"404",
"Not Found",
"Invalid",
"schema violation",
"Forbidden"
]

for pattern in permanent_patterns:
if pattern in error_str:
raise PermanentError(error_str)

# Unknown: log and skip
print(f"⚠ Unknown error, treating as permanent: {error_str}")
raise PermanentError(error_str)

def extract_with_smart_retry(api_url: str, max_retries: int = 5):
"""Extract with smart retry (only retries transient errors)."""

for attempt in range(max_retries):
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status()
return response.json()

except Exception as e:
try:
categorize_error(e)
# If no exception, it's retryable
wait_time = (2 ** attempt) * 5
print(f"Transient error, retrying in {wait_time}s...")
time.sleep(wait_time)

except PermanentError:
print(f"Permanent error, not retrying: {e}")
raise

Testing ETL Error Handling: Chaos Testing

Test error handling with injected failures:

import random
from unittest.mock import patch, MagicMock

def test_etl_with_injected_failures():
"""Test ETL resilience by injecting failures."""

# Simulate API returning 500 on second call
call_count = 0
def mock_api(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 2:
raise Exception("API returned 500")
return {"data": [{"id": 1, "content": "test"}]}

with patch('requests.get', side_effect=mock_api):
records = extract_with_graceful_degradation("https://api.example.com/documents")
assert len(records) > 0, "Should return partial results despite error"

print("✓ Test passed: graceful degradation works")

def test_circuit_breaker():
"""Test circuit breaker transitions."""

breaker = CircuitBreaker(failure_threshold=2, reset_timeout=1)

# Cause failures
def failing_func():
raise Exception("Service down")

for _ in range(2):
try:
breaker.call(failing_func)
except:
pass

# Circuit should now be open
assert breaker.state == CircuitState.OPEN

# Reject new calls
try:
breaker.call(failing_func)
assert False, "Should reject call"
except Exception as e:
assert "OPEN" in str(e)

# Wait for reset timeout
time.sleep(1.1)

# Circuit transitions to HALF_OPEN on next call
def working_func():
return "success"

result = breaker.call(working_func)
assert result == "success"
assert breaker.state == CircuitState.CLOSED

print("✓ Test passed: circuit breaker works")

# Run tests
test_etl_with_injected_failures()
test_circuit_breaker()

Key Takeaways

  • Use circuit breakers to stop calling failing services, preventing cascading failures.
  • Implement dead-letter queues to quarantine poison data, allowing pipeline to continue.
  • Practice graceful degradation: partial results beat total failure.
  • Categorize errors (retryable vs. permanent) to avoid wasting retries on impossible cases.
  • Test error handling with chaos injection; test circuit breakers, timeouts, and recovery.

Frequently Asked Questions

How many retries is reasonable?

For transient errors (timeouts, 503): 3–5 retries. For rate limits (429): 1–2 retries with long backoff. For permanent errors (404, auth): 0 retries. Use exponential backoff: 1s, 2s, 4s, 8s, 16s.

Should I retry immediately or wait?

Wait. Use exponential backoff to give the source system time to recover. Immediate retry (hammering) makes things worse. Minimum wait: 1 second. Maximum wait: 5–15 minutes for severe outages.

What if the source API has no rate-limit headers?

Implement conservative rate limiting: 1 request per second, or even 1 per 10 seconds. Use a token bucket or sliding window to enforce it. Better to be slow than get rate-limited and blocked.

How do I clean up old DLQ entries?

Archive DLQ entries older than 30 days to cold storage (S3 Glacier). Keep recent entries (30 days) for quick access. Set a cron job to periodically clean up and report on DLQ health (how many entries, top error types).

Should I pause the entire pipeline if one stage fails?

No. Use partial results if possible. If a component is down, skip it (degrade), log, and alert. Example: if embedding API is down, ingest documents without embeddings; embeddings can be added later.

Further Reading