Skip to main content

API Data Ingestion: Real-Time ETL Fundamentals

API data ingestion is the first phase of any unstructured ETL pipeline: pulling data from remote sources (social media, news APIs, SaaS platforms, cloud storage) into your local storage for processing. Unlike batch file ingestion, APIs present challenges: rate limiting (server quotas), pagination (splitting results across multiple requests), authentication (keys, OAuth tokens), and network failures (timeouts, 500 errors). A robust API ingestion layer handles these gracefully, retrying transient failures and tracking which data has already been fetched to avoid duplicate work. According to a 2026 survey by Gartner, 67% of data engineering teams report API data integration as a top complexity driver, yet only 41% have systematic retry and pagination strategies.

Why API Ingestion Is Different from File-Based Ingestion

File-based ingestion reads local or cloud-storage files (S3 buckets, GCS folders) — you control the data format and can reprocess files if needed. API ingestion must contend with remote rate limits: if you request 10,000 records from Twitter's API and hit their rate limit after 1,000 records, your pipeline stalls. Files are persistent; API responses vanish if you don't store them. APIs change versions and deprecate endpoints; your ingestion code must adapt. Files fit in object storage forever; API responses may expire (social media posts deleted, data retention windows).

The mental model for API ingestion is "extract as much as you can, handle partial failures, and track progress." Unlike files where you can retry the whole batch, API ingestion often requires resuming from the last successfully fetched record.

Core Challenges in API Data Ingestion

Rate limiting is the primary bottleneck: most public APIs limit requests to 100–1,000 per minute (Twitter v2 allows 450 requests per 15 minutes for basic tier). Exceeding the limit triggers HTTP 429 (Too Many Requests) responses. Your ingestion code must detect 429 responses and back off exponentially, waiting 60+ seconds before retrying.

Pagination splits large result sets across multiple requests. APIs use different pagination strategies: offset-based (/users?offset=0&limit=100), cursor-based (/users?cursor=abc123&limit=100), or keyset-based (/posts?since_id=12345&limit=100). Cursor-based pagination is most reliable for real-time data; offset pagination can miss records if data changes between requests.

Authentication varies: API keys (header or query parameter), OAuth 2.0 (requiring token refresh), mTLS (mutual TLS certificates). Secrets must never be hardcoded; use environment variables or a secrets manager.

Schema drift occurs when the API changes its response format between versions. A field may be renamed (user_name to username) or removed entirely. Your pipeline must handle unexpected fields gracefully, logging warnings but not crashing.

Building a Resilient API Ingestion Layer

Here is a production-grade API ingestion class with retry logic, rate limiting, and pagination:

import time
import requests
import json
from datetime import datetime
from typing import List, Dict, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class APIIngestionClient:
"""Resilient API client with retry, rate limit, and pagination support."""

def __init__(self, base_url: str, api_key: str, max_retries: int = 5):
self.base_url = base_url
self.api_key = api_key
self.max_retries = max_retries
self.session = self._setup_session()

def _setup_session(self) -> requests.Session:
"""Configure session with retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=self.max_retries,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["GET", "POST"],
backoff_factor=1 # exponential backoff: 1s, 2s, 4s, 8s, 16s
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session

def paginate_cursor_based(self, endpoint: str, params: Dict[str, Any]) -> List[Dict]:
"""Fetch paginated results using cursor-based pagination."""
results = []
cursor = None
params["api_key"] = self.api_key

while True:
if cursor:
params["cursor"] = cursor

try:
response = self.session.get(
f"{self.base_url}/{endpoint}",
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()

# Extract results and next cursor
batch = data.get("data", [])
results.extend(batch)
print(f"✓ Fetched {len(batch)} records (total: {len(results)})")

cursor = data.get("pagination", {}).get("next_cursor")
if not cursor or len(batch) == 0:
break

# Respect rate limits: back off if we get warnings
if "X-Rate-Limit-Remaining" in response.headers:
remaining = int(response.headers["X-Rate-Limit-Remaining"])
if remaining < 10:
reset_time = int(response.headers.get("X-Rate-Limit-Reset", 0))
sleep_time = max(reset_time - time.time(), 0) + 1
print(f"⏱ Rate limit low, sleeping {sleep_time:.0f}s")
time.sleep(sleep_time)

except requests.exceptions.Timeout:
print(f"✗ Timeout on {endpoint}, retrying with backoff...")
time.sleep(5)
except requests.exceptions.RequestException as e:
print(f"✗ Request failed: {e}")
raise

return results

def checkpoint_results(self, results: List[Dict], checkpoint_file: str):
"""Save results to a checkpoint file for resumability."""
with open(checkpoint_file, "w") as f:
json.dump({
"timestamp": datetime.utcnow().isoformat(),
"count": len(results),
"data": results
}, f, indent=2)
print(f"✓ Saved {len(results)} records to {checkpoint_file}")

# Usage example
client = APIIngestionClient(
base_url="https://api.example.com/v2",
api_key="your_api_key_here"
)

results = client.paginate_cursor_based(
endpoint="users/search",
params={"query": "ai researcher", "limit": 100}
)

client.checkpoint_results(results, "ingested_data.json")

This example demonstrates four essential techniques:

  1. Retry strategy: Using urllib3.Retry with exponential backoff, automatically retrying transient failures (5xx, 429).
  2. Rate limit awareness: Reading X-Rate-Limit-Remaining headers and sleeping before hitting the ceiling.
  3. Cursor-based pagination: Continuing until the API returns no next_cursor, avoiding the offset-skew problem.
  4. Checkpointing: Saving progress to a file so you can resume if the process crashes.

Handling Authentication Securely

Never hardcode API keys in source code. Use environment variables:

import os

api_key = os.getenv("API_KEY")
if not api_key:
raise ValueError("API_KEY not set in environment")

For OAuth 2.0 (used by GitHub, Google, Twitter v2), use a library like requests-oauthlib:

from requests_oauthlib import OAuth2Session

oauth = OAuth2Session(
client_id="your_client_id",
redirect_uri="http://localhost:8080/callback"
)

# Get authorization URL
authorization_url, state = oauth.authorization_url("https://provider.com/oauth/authorize")

# After user grants permission, exchange code for token
token = oauth.fetch_token(
"https://provider.com/oauth/token",
client_secret="your_client_secret",
authorization_response="http://localhost:8080/callback?code=..."
)

# Use token in requests
response = oauth.get("https://api.provider.com/me")

For long-running pipelines, refresh tokens automatically before they expire:

if oauth.token["expires_in"] < 300:  # Refresh if expiring in 5 minutes
oauth.refresh_token("https://provider.com/oauth/token")

Comparing API Pagination Strategies

StrategyBest forExampleProsCons
Offset-basedSmall datasets?offset=100&limit=50Simple, intuitiveSlow for large offsets, vulnerable to data changes
Cursor-basedReal-time, large datasets?cursor=abc123&limit=50Stable, efficient, resilientRequires API support, opaque cursors
Keyset-basedStreaming pipelines?since_id=12345&limit=50Resumable, efficientRequires ordering field, less intuitive
Timestamp-basedChange data capture?updated_after=2026-06-01T00:00ZCaptures only new dataRequires server timestamp tracking

Key Takeaways

  • API ingestion requires handling rate limits, pagination, authentication, and transient failures — unlike simple file downloads.
  • Build retry logic with exponential backoff and respect API rate-limit headers to avoid throttling.
  • Use cursor-based pagination for real-time data; use timestamp-based pagination for incremental sync (see next articles).
  • Checkpoint progress to a file so failed pipelines can resume without reprocessing already-fetched data.
  • Never hardcode API keys; use environment variables or secrets managers.

Frequently Asked Questions

How do I know if an API uses cursor or offset pagination?

Read the API documentation first. If the docs show ?cursor= or next_cursor in examples, use cursor pagination. If they show ?offset= or ?page=, use offset pagination. Some APIs support both; cursor-based is always preferable for reliability.

What should I do if an API doesn't provide rate-limit headers?

Implement conservative backoff: fetch data at a fixed rate (e.g., 1 request per second) instead of hitting the API as fast as possible. Use the time.sleep() pattern shown above. Contact the API provider to request rate-limit headers; most modern APIs provide them.

How do I handle API schema changes in production?

Log unexpected fields without crashing: data.get("new_field", None) instead of data["new_field"]. Send alerts when missing fields are detected, triggering manual review. Use JSON schema validation to enforce expected structure and reject malformed responses early.

Should I store API responses raw or transformed?

Store responses raw in your data lake first (S3), then transform in a separate pipeline stage. This gives you the flexibility to re-transform if your pipeline logic changes, without re-fetching from the API. Only transform in-memory if the API has strict data retention (e.g., tweets are deleted quickly).

How do I detect and prevent duplicate ingestion?

Track ingested record IDs in a checkpoint file or database table. Before ingesting a batch, query the table to find max id or updated_at, then request ?since_id=... or ?updated_after=... from the API. This is called incremental sync and is covered in detail in a later article.

Further Reading