Skip to main content

Building Production Pipelines with Typed LLM Outputs

Production LLM systems rarely invoke a single model once. They chain multiple calls—classification, reasoning, refinement, grading—where each step depends on the previous. Typed schemas at every step eliminate the fragility of text-based integrations. This guide teaches you to architect end-to-end pipelines where every intermediate value is schema-validated, logged, and monitorable.

Anatomy of a Production LLM Pipeline

A typical production pipeline has these layers:

  1. Input validation: Ensure user input conforms to expected types.
  2. Step 1 – Classification: Route the input to the right sub-task.
  3. Step 2 – Extraction/Reasoning: Generate structured intermediate results.
  4. Step 3 – Refinement: Polish or fact-check the output.
  5. Step 4 – Formatting: Return to user-specified format.
  6. Observability: Log all intermediates for debugging and cost analysis.

Every step outputs JSON that matches a schema; every step validates its input.

Example: Support Ticket Resolution Pipeline

This pipeline classifies a support ticket, suggests a solution, and rates the solution:

from enum import Enum
from pydantic import BaseModel, Field
from openai import OpenAI
import logging

client = OpenAI()
logger = logging.getLogger(__name__)

# ============ Schemas ============

class TicketCategory(str, Enum):
BUG = "bug"
FEATURE = "feature"
SUPPORT = "support"
BILLING = "billing"

class TicketClassification(BaseModel):
"""Step 1 output: classify the ticket."""
category: TicketCategory
priority: str = Field(..., pattern="^(low|medium|high|critical)$")
confidence: float = Field(..., ge=0, le=1)
summary: str = Field(..., max_length=200)

class ResolutionStep(BaseModel):
"""Step 2 output: suggest a resolution."""
suggested_action: str = Field(..., max_length=500)
estimated_time: str # "15 mins", "1 hour", "1-2 days"
requires_escalation: bool
escalation_reason: str = Field(default="", max_length=200)

class SolutionRating(BaseModel):
"""Step 3 output: rate the solution."""
clarity_score: int = Field(..., ge=1, le=5)
feasibility_score: int = Field(..., ge=1, le=5)
customer_satisfaction_estimate: int = Field(..., ge=1, le=100)
needs_revision: bool

# ============ Pipeline Steps ============

def classify_ticket(ticket_text: str) -> TicketClassification:
"""Step 1: Classify the ticket."""
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{
"role": "user",
"content": f"Classify this support ticket:\n\n{ticket_text}"
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "TicketClassification",
"schema": TicketClassification.model_json_schema(),
"strict": True
}
}
)

result = TicketClassification.model_validate_json(
response.choices[0].message.content
)
logger.info(f"Classified ticket as {result.category.value} (priority: {result.priority})")
return result

def suggest_resolution(
ticket_text: str,
classification: TicketClassification
) -> ResolutionStep:
"""Step 2: Generate resolution based on classification."""
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{
"role": "user",
"content": f"""Ticket: {ticket_text}

Category: {classification.category.value}
Priority: {classification.priority}

Suggest a resolution."""
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "ResolutionStep",
"schema": ResolutionStep.model_json_schema(),
"strict": True
}
}
)

result = ResolutionStep.model_validate_json(
response.choices[0].message.content
)
logger.info(f"Suggested action: {result.suggested_action[:50]}...")
return result

def rate_solution(
ticket_text: str,
resolution: ResolutionStep
) -> SolutionRating:
"""Step 3: Rate the proposed solution."""
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{
"role": "user",
"content": f"""Original ticket: {ticket_text}

Proposed solution: {resolution.suggested_action}

Rate this solution on clarity, feasibility, and customer satisfaction."""
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "SolutionRating",
"schema": SolutionRating.model_json_schema(),
"strict": True
}
}
)

result = SolutionRating.model_validate_json(
response.choices[0].message.content
)
logger.info(f"Solution rated: clarity={result.clarity_score}, feasibility={result.feasibility_score}")
return result

# ============ Pipeline Orchestration ============

class PipelineResult(BaseModel):
"""Final output: all steps combined."""
ticket_text: str
classification: TicketClassification
resolution: ResolutionStep
rating: SolutionRating
success: bool

def process_support_ticket(ticket_text: str) -> PipelineResult:
"""Orchestrate the full pipeline."""
try:
logger.info(f"Processing ticket: {ticket_text[:50]}...")

# Step 1
classification = classify_ticket(ticket_text)

# Step 2
resolution = suggest_resolution(ticket_text, classification)

# Step 3
rating = rate_solution(ticket_text, resolution)

result = PipelineResult(
ticket_text=ticket_text,
classification=classification,
resolution=resolution,
rating=rating,
success=True
)

logger.info("Pipeline completed successfully")
return result

except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise

# ============ Usage ============

if __name__ == "__main__":
ticket = "The app keeps crashing when I try to upload a photo. It worked yesterday."
result = process_support_ticket(ticket)

print(f"Category: {result.classification.category.value}")
print(f"Priority: {result.classification.priority}")
print(f"Action: {result.resolution.suggested_action}")
print(f"Solution quality: {result.rating.clarity_score + result.rating.feasibility_score}/10")

Observability and Logging

Log all intermediates for debugging, cost tracking, and continuous improvement:

import json
from datetime import datetime

class PipelineLogger:
"""Structured logging for pipeline steps."""

def __init__(self, pipeline_id: str):
self.pipeline_id = pipeline_id
self.steps = []
self.start_time = datetime.now()

def log_step(self, step_name: str, input_data: dict, output: BaseModel, latency_ms: float):
"""Record a pipeline step."""
self.steps.append({
"step": step_name,
"input": input_data,
"output": output.model_dump(),
"latency_ms": latency_ms,
"timestamp": datetime.now().isoformat()
})

def get_report(self) -> dict:
"""Generate a full pipeline report."""
total_latency = (datetime.now() - self.start_time).total_seconds() * 1000
return {
"pipeline_id": self.pipeline_id,
"total_latency_ms": total_latency,
"step_count": len(self.steps),
"steps": self.steps
}

def save_to_file(self, filename: str):
"""Persist the report."""
with open(filename, "w") as f:
json.dump(self.get_report(), f, indent=2)

# Usage in pipeline
def process_support_ticket_with_logging(ticket_text: str) -> PipelineResult:
"""Pipeline with structured observability."""
import time

logger_obj = PipelineLogger(pipeline_id=f"ticket-{int(time.time())}")

try:
# Step 1
start = time.time()
classification = classify_ticket(ticket_text)
latency = (time.time() - start) * 1000
logger_obj.log_step("classify", {"ticket": ticket_text[:100]}, classification, latency)

# Step 2
start = time.time()
resolution = suggest_resolution(ticket_text, classification)
latency = (time.time() - start) * 1000
logger_obj.log_step("resolve", {"classification": classification.model_dump()}, resolution, latency)

# Step 3
start = time.time()
rating = rate_solution(ticket_text, resolution)
latency = (time.time() - start) * 1000
logger_obj.log_step("rate", {"resolution": resolution.model_dump()}, rating, latency)

# Save report
logger_obj.save_to_file("pipeline_report.json")
logger.info(f"Pipeline report saved")

return PipelineResult(
ticket_text=ticket_text,
classification=classification,
resolution=resolution,
rating=rating,
success=True
)
except Exception as e:
logger.error(f"Pipeline failed at step: {e}")
logger_obj.save_to_file("pipeline_error_report.json")
raise

Error Recovery and Graceful Degradation

Handle partial failures at each step:

def process_support_ticket_resilient(ticket_text: str, max_retries: int = 2) -> PipelineResult:
"""Pipeline with graceful error recovery."""

classification = None
resolution = None
rating = None

# Step 1: Classification (required)
for attempt in range(max_retries):
try:
classification = classify_ticket(ticket_text)
break
except Exception as e:
logger.warning(f"Classification failed (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
logger.error("Classification failed after all retries. Aborting.")
raise

# Step 2: Resolution (can fail gracefully)
try:
resolution = suggest_resolution(ticket_text, classification)
except Exception as e:
logger.warning(f"Resolution generation failed: {e}. Using default.")
resolution = ResolutionStep(
suggested_action="Please contact support for further assistance.",
estimated_time="1-2 hours",
requires_escalation=True,
escalation_reason="Automated resolution failed"
)

# Step 3: Rating (nice-to-have, skip on failure)
try:
rating = rate_solution(ticket_text, resolution)
except Exception as e:
logger.warning(f"Solution rating failed: {e}. Skipping.")
rating = SolutionRating(
clarity_score=3,
feasibility_score=3,
customer_satisfaction_estimate=50,
needs_revision=True
)

return PipelineResult(
ticket_text=ticket_text,
classification=classification,
resolution=resolution,
rating=rating,
success=resolution is not None
)

Testing Pipelines with Mock Schemas

Test the entire pipeline without hitting the API:

import pytest
from unittest.mock import patch, MagicMock

def test_pipeline_full_success():
"""Test happy-path pipeline."""
with patch("openai.ChatCompletion.create") as mock_create:
# Mock responses for each step
mock_create.side_effect = [
MagicMock(
choices=[MagicMock(message=MagicMock(
content='{"category": "bug", "priority": "high", "confidence": 0.95, "summary": "Login crash"}'
))]
),
MagicMock(
choices=[MagicMock(message=MagicMock(
content='{"suggested_action": "Clear cache", "estimated_time": "15 mins", "requires_escalation": false, "escalation_reason": ""}'
))]
),
MagicMock(
choices=[MagicMock(message=MagicMock(
content='{"clarity_score": 5, "feasibility_score": 4, "customer_satisfaction_estimate": 85, "needs_revision": false}'
))]
)
]

result = process_support_ticket("The app crashes on login")

assert result.success
assert result.classification.category == TicketCategory.BUG
assert result.resolution.suggested_action == "Clear cache"
assert result.rating.clarity_score == 5

def test_pipeline_graceful_failure():
"""Test resilient handling of step failure."""
with patch("openai.ChatCompletion.create") as mock_create:
mock_create.side_effect = [
# Step 1 succeeds
MagicMock(choices=[MagicMock(message=MagicMock(
content='{"category": "support", "priority": "medium", "confidence": 0.8, "summary": "Needs help"}'
))]),
# Step 2 fails
Exception("API error"),
# Step 3 skipped (never called)
]

result = process_support_ticket_resilient("Help me")

# Pipeline succeeds despite Step 2 failure
assert result.classification.category == TicketCategory.SUPPORT
assert result.resolution.requires_escalation == True # Default fallback
assert result.success == False # Indicates partial failure

Key Takeaways

  • Design pipelines as a sequence of validated steps, each consuming and producing typed data.
  • Use Pydantic or Zod schemas at every step; compose intermediate outputs from previous steps.
  • Implement structured logging for all intermediates (input, output, latency) for debugging and optimization.
  • Add retry logic and graceful fallbacks at critical steps; skip or default non-critical steps on failure.
  • Test pipelines with mocked LLM responses; verify both happy-path and failure scenarios.
  • Monitor end-to-end latency and cost; log all pipeline runs for continuous improvement.

Frequently Asked Questions

Should I use the same LLM for every step or different models for different steps?

Use the most efficient model that produces valid output for each step. Early classification steps might use a smaller, faster model; final reasoning steps might use a larger model. Track costs per step.

How do I handle data passing between steps without massive prompts?

Pass only essential context. Step 2 needs Step 1's output; include it as structured fields in the prompt, not the raw user input. This keeps prompts focused and latency low.

What if one step's output is very large?

Use field constraints (e.g., maxLength) to keep intermediate outputs bounded. If Step 2 produces a large document, summarize it before passing to Step 3.

Should I retry the entire pipeline or just the failing step?

Retry only the failing step, using earlier outputs. Rerunning the entire pipeline wastes cost and time. If Step 2 fails, retry Step 2 with the same Step 1 output.

How do I test a pipeline where each step is expensive (real API calls)?

Use CI/CD to run full pipelines on a schedule (nightly), not on every commit. For rapid iteration, use mocked responses. For validation, run against a sample of real data.

Further Reading