End-to-End Document Extraction Pipeline: Building Your System
An end-to-end document extraction pipeline integrates all components covered in this series: ingestion, extraction, validation, confidence scoring, human review, and downstream integration. Building a pipeline means orchestrating these pieces so documents flow from upload through extraction through review to final database without manual intervention (except for flagged items). This is what production document AI looks like.
I've built several complete pipelines, and the lesson is: the magic isn't in any single component, but in how you connect them. A simple extraction model connected to a robust validation pipeline outperforms a sophisticated model without proper error handling and human review.
Architecture Overview
A production pipeline has these layers:
Ingestion → Pre-processing → Extraction → Validation → Confidence Scoring → Review Queue → Approval → Integration
Let's build a complete system:
import json
import hashlib
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional
import anthropic
import base64
# === LAYER 1: INGESTION ===
@dataclass
class DocumentMetadata:
"""Metadata for an ingested document."""
document_id: str
original_filename: str
file_path: str
file_size: int
file_hash: str
ingested_at: str
document_type: str # "invoice", "receipt", etc.
class DocumentIngestionQueue:
"""Queue for incoming documents."""
def __init__(self, watch_dir: str):
self.watch_dir = Path(watch_dir)
self.watch_dir.mkdir(exist_ok=True)
self.queue = []
def ingest_document(self, file_path: str, document_type: str = "invoice") -> DocumentMetadata:
"""Ingest a document from the file system."""
file_path = Path(file_path)
# Compute file hash for deduplication
file_hash = self._compute_file_hash(file_path)
# Check for duplicates
if self._is_duplicate(file_hash):
raise ValueError(f"Duplicate document: {file_hash}")
# Create metadata
doc_id = self._generate_doc_id(file_path)
metadata = DocumentMetadata(
document_id=doc_id,
original_filename=file_path.name,
file_path=str(file_path),
file_size=file_path.stat().st_size,
file_hash=file_hash,
ingested_at=datetime.now().isoformat(),
document_type=document_type
)
self.queue.append(metadata)
return metadata
def _compute_file_hash(self, file_path: Path) -> str:
"""Compute SHA256 hash of file for deduplication."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
sha256.update(chunk)
return sha256.hexdigest()
def _is_duplicate(self, file_hash: str) -> bool:
"""Check if file has already been ingested."""
return any(doc.file_hash == file_hash for doc in self.queue)
def _generate_doc_id(self, file_path: Path) -> str:
"""Generate unique document ID."""
return f"doc_{int(datetime.now().timestamp() * 1000)}"
def get_next_batch(self, batch_size: int = 10) -> list[DocumentMetadata]:
"""Get next batch of documents to process."""
batch = self.queue[:batch_size]
self.queue = self.queue[batch_size:]
return batch
# === LAYER 2: PRE-PROCESSING ===
def preprocess_document(file_path: str) -> str:
"""
Pre-process document image (deskew, enhance contrast, etc.).
Returns path to pre-processed image.
"""
from PIL import Image
import numpy as np
img = Image.open(file_path)
img_array = np.array(img)
# Simple enhancement: increase contrast
from PIL import ImageEnhance
enhancer = ImageEnhance.Contrast(img)
enhanced = enhancer.enhance(1.5)
# Save pre-processed image
output_path = str(Path(file_path).with_stem(Path(file_path).stem + "_preprocessed"))
enhanced.save(output_path)
return output_path
# === LAYER 3: EXTRACTION ===
@dataclass
class ExtractionResult:
"""Result of document extraction."""
document_id: str
extracted_data: dict
raw_response: str
extraction_model: str
extraction_time_seconds: float
extracted_at: str
def extract_from_document(metadata: DocumentMetadata) -> ExtractionResult:
"""Extract structured data from document."""
client = anthropic.Anthropic()
start_time = datetime.now()
# Read and preprocess image
image_path = metadata.file_path
# preprocessed_path = preprocess_document(image_path) # Optional
image_data = Path(image_path).read_bytes()
base64_image = base64.standard_b64encode(image_data).decode("utf-8")
# Extraction prompt
extraction_prompt = """Extract invoice data from this document as JSON:
{
"invoice_number": "string",
"invoice_date": "YYYY-MM-DD",
"vendor_name": "string",
"total_amount": number,
"currency": "USD|EUR|GBP",
"line_items": [{"description": "string", "quantity": number, "unit_price": number, "total": number}]
}
If a field is missing, use null. Return ONLY valid JSON."""
# Call Claude
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image
}
},
{
"type": "text",
"text": extraction_prompt
}
]
}
]
)
extracted_text = response.content[0].text
extracted_data = json.loads(extracted_text)
elapsed = (datetime.now() - start_time).total_seconds()
return ExtractionResult(
document_id=metadata.document_id,
extracted_data=extracted_data,
raw_response=extracted_text,
extraction_model="claude-3-5-sonnet-20241022",
extraction_time_seconds=elapsed,
extracted_at=datetime.now().isoformat()
)
# === LAYER 4: VALIDATION ===
def validate_extraction(result: ExtractionResult, schema: dict) -> tuple[bool, list[str]]:
"""Validate extracted data against schema."""
from jsonschema import validate, ValidationError
errors = []
try:
validate(instance=result.extracted_data, schema=schema)
except ValidationError as e:
errors.append(f"Schema validation: {e.message}")
# Custom validation rules
data = result.extracted_data
if data.get("total_amount") and data.get("total_amount") < 0:
errors.append("Total amount cannot be negative")
if data.get("line_items"):
calculated_total = sum(
item.get("total", 0) for item in data["line_items"]
)
if abs(calculated_total - (data.get("total_amount") or 0)) > 0.01:
errors.append("Total amount doesn't match sum of line items")
return len(errors) == 0, errors
# === LAYER 5: CONFIDENCE SCORING ===
def compute_confidence(result: ExtractionResult, validation_passed: bool) -> float:
"""Compute confidence score for extraction."""
base_confidence = 0.5
# Completeness: are required fields present?
required_fields = ["invoice_number", "vendor_name", "total_amount"]
present = sum(
1 for field in required_fields
if result.extracted_data.get(field) is not None
)
base_confidence += (present / len(required_fields)) * 0.3
# Validation: did it pass validation?
if validation_passed:
base_confidence += 0.2
return min(1.0, max(0.0, base_confidence))
# === LAYER 6: REVIEW QUEUE ===
@dataclass
class ReviewTask:
"""Task for human review."""
document_id: str
extraction_result: ExtractionResult
confidence_score: float
validation_errors: list[str]
requires_review: bool
created_at: str
class ReviewQueueManager:
"""Manage review queue."""
def __init__(self):
self.auto_approved = []
self.pending_review = []
self.rejected = []
def route_document(self, result: ExtractionResult, confidence: float,
validation_passed: bool, errors: list[str]):
"""Route document to review or auto-approve."""
task = ReviewTask(
document_id=result.document_id,
extraction_result=result,
confidence_score=confidence,
validation_errors=errors,
requires_review=not validation_passed or confidence < 0.80,
created_at=datetime.now().isoformat()
)
if task.requires_review:
self.pending_review.append(task)
return "PENDING_REVIEW"
else:
self.auto_approved.append(task)
return "AUTO_APPROVED"
# === LAYER 7: APPROVAL & INTEGRATION ===
def integrate_to_database(extraction_result: ExtractionResult,
database_connection):
"""Write approved extraction to database."""
# In production, use a real database client
# Example: PostgreSQL, MongoDB, DynamoDB, etc.
print(f"Integrating document {extraction_result.document_id} to database")
# Insert vendor if new
vendor_name = extraction_result.extracted_data.get("vendor_name")
if vendor_name:
print(f" - Inserting vendor: {vendor_name}")
# Insert invoice
print(f" - Inserting invoice: {extraction_result.extracted_data.get('invoice_number')}")
# Insert line items
for item in extraction_result.extracted_data.get("line_items", []):
print(f" - Item: {item.get('description')} ({item.get('quantity')} × {item.get('unit_price')})")
# === COMPLETE PIPELINE ===
def run_document_extraction_pipeline(watch_dir: str, batch_size: int = 10):
"""Run complete extraction pipeline."""
print("Starting Document Extraction Pipeline")
print("=" * 60)
# Setup
ingestion_queue = DocumentIngestionQueue(watch_dir)
review_queue_mgr = ReviewQueueManager()
# Invoice schema
invoice_schema = {
"type": "object",
"properties": {
"invoice_number": {"type": "string"},
"invoice_date": {"type": "string"},
"vendor_name": {"type": "string"},
"total_amount": {"type": "number", "minimum": 0},
"currency": {"type": "string"},
"line_items": {"type": "array"}
},
"required": ["invoice_number", "vendor_name", "total_amount"]
}
# Get documents
document_files = list(Path(watch_dir).glob("*.jpg"))[:batch_size]
# Process each document
for doc_file in document_files:
print(f"\nProcessing: {doc_file.name}")
print("-" * 60)
try:
# 1. Ingest
metadata = ingestion_queue.ingest_document(str(doc_file), "invoice")
print(f" [INGEST] Document ID: {metadata.document_id}")
# 2. Extract
extraction = extract_from_document(metadata)
print(f" [EXTRACT] Time: {extraction.extraction_time_seconds:.2f}s")
print(f" - Vendor: {extraction.extracted_data.get('vendor_name')}")
print(f" - Amount: {extraction.extracted_data.get('total_amount')}")
# 3. Validate
is_valid, errors = validate_extraction(extraction, invoice_schema)
print(f" [VALIDATE] Valid: {is_valid}")
if errors:
for error in errors:
print(f" - Error: {error}")
# 4. Score
confidence = compute_confidence(extraction, is_valid)
print(f" [SCORE] Confidence: {confidence:.2%}")
# 5. Route
route = review_queue_mgr.route_document(extraction, confidence, is_valid, errors)
print(f" [ROUTE] Decision: {route}")
# 6. Integrate (if auto-approved)
if route == "AUTO_APPROVED":
print(f" [INTEGRATE] Writing to database...")
# integrate_to_database(extraction, None) # Would use real DB
print(f" - Success!")
except Exception as e:
print(f" [ERROR] {str(e)}")
# Summary
print("\n" + "=" * 60)
print("PIPELINE SUMMARY")
print(f" - Documents processed: {len(document_files)}")
print(f" - Auto-approved: {len(review_queue_mgr.auto_approved)}")
print(f" - Pending review: {len(review_queue_mgr.pending_review)}")
print(f" - Auto-approval rate: {len(review_queue_mgr.auto_approved) / len(document_files) * 100:.1f}%")
# === EXAMPLE USAGE ===
if __name__ == "__main__":
# Create sample documents directory
sample_dir = "./sample_invoices"
Path(sample_dir).mkdir(exist_ok=True)
# Run pipeline (in real usage, this would process actual documents)
# run_document_extraction_pipeline(sample_dir, batch_size=10)
print("Pipeline code ready. Provide document directory path to run.")
Production Considerations
1. Error Recovery and Monitoring
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('extraction_pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def pipeline_with_logging(watch_dir: str):
"""Run pipeline with comprehensive logging."""
logger.info("Pipeline started")
try:
# Process documents
logger.info("Processing batch...")
# ... pipeline code ...
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
2. Database Integration
import sqlite3
def integrate_to_sqlite(extraction: ExtractionResult, db_path: str = "extraction.db"):
"""Write extraction to SQLite database."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create tables if needed
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
vendor_name TEXT,
invoice_number TEXT,
invoice_date TEXT,
total_amount REAL,
extracted_at TEXT
)
""")
# Insert document
data = extraction.extracted_data
cursor.execute("""
INSERT INTO documents (id, vendor_name, invoice_number, invoice_date, total_amount, extracted_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
extraction.document_id,
data.get("vendor_name"),
data.get("invoice_number"),
data.get("invoice_date"),
data.get("total_amount"),
extraction.extracted_at
))
conn.commit()
conn.close()
3. Scaling Considerations
For production pipelines processing 10,000+ documents:
- Use message queues (RabbitMQ, Kafka) for ingestion
- Distribute processing across multiple workers
- Cache extraction prompts to save token costs
- Archive completed documents to cold storage
- Monitor API rate limits and adjust concurrency
Key Takeaways
- An end-to-end pipeline integrates ingestion, extraction, validation, confidence scoring, review, and integration.
- Route high-confidence documents to auto-approval and database integration; flag low-confidence for human review.
- Validate against schemas; compute confidence from multiple signals (completeness, validation, image quality).
- Log everything; build comprehensive monitoring and error recovery.
- Scale using message queues, distributed workers, and batch processing for production deployment.
Frequently Asked Questions
How do I handle documents that fail extraction?
Log the failure with the document ID, store the original image, and either: (1) resubmit with a refined prompt, (2) route to human review, or (3) reject and request a rescan. Track failure rates by document type to identify systemic issues.
What database should I use?
For structured extraction (invoices, forms), use relational databases (PostgreSQL, MySQL). For flexible/variable schemas, use NoSQL (MongoDB, DynamoDB). For high-volume search, use specialized storage (Elasticsearch for full-text). Choose based on your downstream query patterns.
How do I handle confidential data?
Encrypt data in transit (HTTPS) and at rest (database encryption). Consider using local/self-hosted models for sensitive documents. Implement access controls and audit trails. For HIPAA/GDPR compliance, consult legal/security teams.
Can I run this on-premises?
Yes, with self-hosted models. Anthropic's Claude is cloud-based, but you could replace it with a self-hosted LLM (Llama, Mistral). Trade-off: convenience vs. privacy/compliance. For on-premises, invest in GPU infrastructure.
How do I measure pipeline performance?
Track: (1) throughput (documents/hour), (2) accuracy (human-verified error rate), (3) cost (per-document), (4) latency (end-to-end time), (5) auto-approval rate, (6) failure rate. Set targets and monitor continuously.