File-Based ETL for Documents: Complete Guide
File-based ETL pipelines ingest documents from cloud storage (S3, GCS, Azure Blob), extract content (text, tables, metadata), and load results into a database or vector store. Unlike API ingestion which pulls data from a remote server, file-based pipelines read from a file system where data is persistent and can be reprocessed. The challenge is scale: a single enterprise might generate 10,000 new PDFs daily, and your pipeline must handle variable file sizes (10 KB memos to 500 MB research archives), corrupt files, and heterogeneous formats (scanned images vs. text-searchable PDFs). According to McKinsey, enterprises store an average of 2.3 exabytes of documents; 85% remain unprocessed due to lack of extraction infrastructure. A modern file-based ETL system uses cloud-native tools (AWS Textract, Google Document AI) or open-source libraries (PyPDF2, Tesseract OCR) to extract text at scale.
How File-Based ETL Differs from API Ingestion
File-based ETL operates on persistent, replayable data: if extraction fails on a PDF, you can retry without worrying that the file has been deleted (unlike API responses). Files have no rate limits; you can process 1,000 files in parallel without hitting a server quota. However, files present different challenges: they live in cloud storage requiring authentication and permission management, extraction is CPU-intensive (OCR on a 50-page scanned PDF takes seconds), and failures are varied (corrupted files, unsupported formats, out-of-memory errors).
The canonical file-based ETL workflow is:
- List objects: Scan S3 or GCS for files matching a pattern (e.g., all PDFs uploaded in the last 24 hours).
- Filter: Skip files already processed (using modification timestamps or a tracking database).
- Download: Stream files to local disk or memory, handling partial downloads.
- Extract: Parse files and extract text, metadata, and structure.
- Transform: Clean, chunk, and enrich extracted content.
- Load: Store results in a database or vector index.
- Mark complete: Record successful processing to avoid reprocessing.
Extracting Text from PDFs and Images
Here is a production example using PyPDF2 for text-based PDFs and Tesseract for scanned images:
import os
import PyPDF2
import pytesseract
from PIL import Image
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DocumentExtractor:
"""Extract text from PDFs and images with fallback to OCR."""
def extract_pdf_text(self, pdf_path: str) -> str:
"""Extract text from a PDF. Use OCR for scanned pages."""
text = ""
try:
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
page_text = page.extract_text()
# If page has no text, it's likely a scanned image
if len(page_text.strip()) < 50:
logger.info(f"Page {page_num} is likely scanned, using OCR")
page_text = self._ocr_pdf_page(pdf_path, page_num)
text += f"\n--- Page {page_num + 1} ---\n{page_text}"
except Exception as e:
logger.error(f"Failed to extract PDF {pdf_path}: {e}")
raise
return text
def _ocr_pdf_page(self, pdf_path: str, page_num: int) -> str:
"""Convert PDF page to image and run OCR."""
# Requires: pip install pdf2image pytesseract
# On Linux: apt-get install tesseract-ocr
# On macOS: brew install tesseract
try:
from pdf2image import convert_from_path
images = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1)
if images:
return pytesseract.image_to_string(images[0])
except Exception as e:
logger.warning(f"OCR failed for page {page_num}: {e}")
return ""
def extract_image_text(self, image_path: str) -> str:
"""Extract text from an image using OCR."""
try:
image = Image.open(image_path)
text = pytesseract.image_to_string(image)
return text
except Exception as e:
logger.error(f"Failed to OCR {image_path}: {e}")
return ""
def extract_metadata(self, pdf_path: str) -> dict:
"""Extract metadata from a PDF."""
metadata = {}
try:
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
if reader.metadata:
metadata["title"] = reader.metadata.get("/Title", "")
metadata["author"] = reader.metadata.get("/Author", "")
metadata["created"] = reader.metadata.get("/CreationDate", "")
metadata["pages"] = len(reader.pages)
except Exception as e:
logger.warning(f"Could not extract metadata from {pdf_path}: {e}")
return metadata
# Usage
extractor = DocumentExtractor()
text = extractor.extract_pdf_text("research_paper.pdf")
metadata = extractor.extract_metadata("research_paper.pdf")
print(f"Extracted {len(text)} characters")
print(f"Metadata: {metadata}")
This example shows four key techniques:
- Detect scanned pages: If a PDF page extracts little text, it's likely a scan. Fall back to OCR.
- Error handling: Wrap extraction in try-except to handle corrupted files gracefully.
- Metadata extraction: Parse PDF metadata (title, author, page count) for filtering and organization.
- Logging: Record errors and warnings to diagnose issues at scale.
Processing Files from Cloud Storage at Scale
For production pipelines, download files from cloud storage, process them, and track completion using a database. Here is a pattern using AWS S3:
import boto3
import json
from datetime import datetime, timedelta
import sqlite3
class S3FileETL:
"""ETL pipeline that processes files from S3 with checkpoint tracking."""
def __init__(self, bucket: str, prefix: str):
self.s3 = boto3.client("s3")
self.bucket = bucket
self.prefix = prefix
self.db = sqlite3.connect("etl_progress.db")
self._init_db()
def _init_db(self):
"""Create a table to track processed files."""
self.db.execute("""
CREATE TABLE IF NOT EXISTS processed_files (
key TEXT PRIMARY KEY,
processed_at TIMESTAMP,
status TEXT,
error_message TEXT
)
""")
self.db.commit()
def list_unprocessed_files(self) -> list:
"""List files in S3 that haven't been processed yet."""
paginator = self.s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.prefix)
unprocessed = []
cursor = self.db.cursor()
for page in pages:
for obj in page.get("Contents", []):
key = obj["Key"]
# Check if already processed
result = cursor.execute(
"SELECT status FROM processed_files WHERE key = ?",
(key,)
).fetchone()
if not result or result[0] == "failed":
unprocessed.append({
"key": key,
"size": obj["Size"],
"modified": obj["LastModified"]
})
return unprocessed
def process_files(self, extractor):
"""Download, extract, and track completion."""
unprocessed = self.list_unprocessed_files()
print(f"Found {len(unprocessed)} unprocessed files")
for file_info in unprocessed:
key = file_info["key"]
local_path = f"/tmp/{key.split('/')[-1]}"
try:
# Download from S3
self.s3.download_file(self.bucket, key, local_path)
# Extract text and metadata
if key.endswith(".pdf"):
text = extractor.extract_pdf_text(local_path)
metadata = extractor.extract_metadata(local_path)
else:
text = extractor.extract_image_text(local_path)
metadata = {}
# Save extracted data
output_key = key.replace(".pdf", ".json").replace(".jpg", ".json")
self.s3.put_object(
Bucket=self.bucket,
Key=output_key,
Body=json.dumps({
"source": key,
"text": text[:50000], # Truncate for storage limits
"metadata": metadata
}),
ContentType="application/json"
)
# Mark as processed
self.db.execute(
"INSERT OR REPLACE INTO processed_files VALUES (?, ?, ?, ?)",
(key, datetime.utcnow().isoformat(), "success", None)
)
self.db.commit()
print(f"✓ Processed {key}")
except Exception as e:
self.db.execute(
"INSERT OR REPLACE INTO processed_files VALUES (?, ?, ?, ?)",
(key, datetime.utcnow().isoformat(), "failed", str(e))
)
self.db.commit()
print(f"✗ Failed {key}: {e}")
# Usage
etl = S3FileETL(bucket="my-documents", prefix="documents/2026-06/")
extractor = DocumentExtractor()
etl.process_files(extractor)
This pattern uses SQLite to track which files have been processed, preventing duplicate work on retry.
Handling Errors Gracefully at Scale
File-based pipelines encounter diverse errors: corrupted PDFs, unsupported formats, out-of-memory on large files, network timeouts during download. Key strategies:
- Fail individual files, not the whole pipeline: Wrap each file in a try-except, log the error, and continue.
- Implement backpressure: If too many files fail, pause the pipeline and alert an operator.
- Categorize errors: Distinguish between retryable (network timeouts) and permanent failures (unsupported format).
- Store failed files separately: Move problematic files to a quarantine bucket (S3 prefix) for manual review.
Key Takeaways
- File-based ETL is ideal for documents stored in cloud storage; files are persistent and replayable unlike API responses.
- Extract text from PDFs using PyPDF2 (text-searchable) and fall back to Tesseract OCR (scanned pages).
- Track processed files in a checkpoint database to avoid reprocessing and enable resumable pipelines.
- Wrap extraction in error handling and quarantine failed files for manual review.
- Use cloud-native services (AWS Textract, Google Document AI) for production scale; open-source libraries for low-volume prototypes.
Frequently Asked Questions
What's the difference between PyPDF2 and pdfplumber for PDF extraction?
PyPDF2 is lightweight and good for text extraction from text-searchable PDFs. pdfplumber is better for tables and precise layout analysis (detecting columns, rows, bounding boxes). For AI pipelines, PyPDF2 is usually sufficient; use pdfplumber if you need structured table extraction.
Should I use Tesseract OCR or cloud services like AWS Textract?
Tesseract is free, open-source, and fast for simple scanned documents. AWS Textract costs $0.015 per page but handles complex layouts, handwriting, and tables better. For high-volume production (>100K pages/month), Textract's accuracy and API integration justify the cost.
How do I detect and skip corrupted PDF files?
Try opening the PDF with PyPDF2; if it raises an exception, mark it as corrupted. Some corrupt PDFs can be repaired with open-source tools like qpdf. If repair fails, quarantine the file and send an alert for manual inspection.
Can I parallelize PDF extraction across multiple workers?
Yes. Use Apache Spark or Dask to distribute file processing across machines. Each worker downloads a file, extracts text, and writes results to cloud storage. Be careful with memory: some PDFs are enormous. Use streaming or chunking to avoid loading entire files into memory.
How do I handle PDF files that are password-protected?
Use PyPDF2 with decrypt(): reader.decrypt("password") before extracting. Store passwords in a secrets manager, not hardcoded. If a PDF is encrypted and you don't have the password, quarantine it and alert an operator.