Real-World LLM Schema Patterns
Production LLM systems use recurring schema patterns that solve common problems. This article distills lessons from real deployments: how to design schemas for paginated queries, file operations, time-sensitive tasks, batch operations, and multi-step workflows. These patterns have been tested at scale and are proven to guide models toward correct behavior while keeping systems reliable.
Pattern 1: Paginated Queries
Many tools return large result sets. Pagination lets the LLM fetch results in chunks:
{
"name": "search_documents",
"description": "Search a document database with optional pagination",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"minLength": 1,
"description": "Search query (1-100 chars)"
},
"page": {
"type": "integer",
"minimum": 1,
"default": 1,
"description": "Page number (1-indexed, default 1)"
},
"page_size": {
"type": "integer",
"minimum": 1,
"maximum": 100,
"default": 20,
"description": "Results per page (1-100, default 20)"
},
"sort_by": {
"type": "string",
"enum": ["relevance", "date_desc", "date_asc"],
"default": "relevance",
"description": "Sort order (default: relevance)"
}
},
"required": ["query"]
}
}
Implementation:
def search_documents(query: str, page: int = 1, page_size: int = 20,
sort_by: str = "relevance") -> dict:
"""Search with pagination support."""
# Fetch total results
total = count_results(query)
# Calculate offsets
offset = (page - 1) * page_size
limit = page_size
# Fetch page
results = fetch_results(query, offset, limit, sort_by)
# Return with pagination metadata
return {
"results": results,
"page": page,
"page_size": page_size,
"total": total,
"has_next": offset + limit < total,
"total_pages": (total + page_size - 1) // page_size
}
The response includes has_next so the LLM knows whether to fetch more pages. This pattern handles queries that return 100+ results gracefully.
Pattern 2: File Operations
Tools that read/write files need careful schemas to avoid path traversal attacks and accidental overwrites:
{
"name": "read_file",
"description": "Read contents of a file in the project workspace",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"pattern": "^[a-zA-Z0-9._/\\-]+$",
"description": "File path relative to project root. Must use only alphanumerics, dots, slashes, hyphens. Example: 'src/main.py' or 'README.md'"
},
"encoding": {
"type": "string",
"enum": ["utf-8", "ascii"],
"default": "utf-8",
"description": "File encoding (default: utf-8)"
}
},
"required": ["path"]
}
}
Implementation with safety checks:
import os
from pathlib import Path
def read_file(path: str, encoding: str = "utf-8") -> dict:
"""Read a file safely."""
# Security: resolve path and ensure it's within project root
project_root = Path.cwd()
file_path = (project_root / path).resolve()
# Reject paths outside project root (path traversal attack)
if not str(file_path).startswith(str(project_root)):
return {"status": "error", "error": "Path outside project root"}
# Reject reading sensitive files
if file_path.name in [".env", ".git", "secrets.json"]:
return {"status": "error", "error": "Cannot read sensitive files"}
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
return {
"status": "success",
"path": str(file_path.relative_to(project_root)),
"size": len(content),
"content": content
}
except FileNotFoundError:
return {"status": "error", "error": f"File not found: {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
Pattern 3: Time-Bounded Operations
For tools that run long queries or expensive operations, enforce time limits:
{
"name": "run_analysis",
"description": "Run a data analysis job with optional timeout",
"parameters": {
"type": "object",
"properties": {
"dataset": {
"type": "string",
"enum": ["sales", "users", "events"],
"description": "Dataset to analyze"
},
"analysis_type": {
"type": "string",
"enum": ["summary", "correlation", "trend"],
"description": "Type of analysis"
},
"timeout_seconds": {
"type": "integer",
"minimum": 10,
"maximum": 300,
"default": 60,
"description": "Timeout in seconds (10-300, default 60). Analysis aborts if it exceeds this."
}
},
"required": ["dataset", "analysis_type"]
}
}
Implementation with timeout enforcement:
import signal
from contextlib import contextmanager
@contextmanager
def timeout(seconds: int):
"""Context manager for enforcing time limits."""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation exceeded {seconds} second timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0) # Cancel the alarm
def run_analysis(dataset: str, analysis_type: str, timeout_seconds: int = 60) -> dict:
"""Run analysis with timeout."""
try:
with timeout(timeout_seconds):
result = perform_analysis(dataset, analysis_type)
return {
"status": "success",
"result": result
}
except TimeoutError as e:
return {
"status": "timeout",
"error": str(e),
"hint": "Dataset may be too large. Try using a smaller dataset or increasing timeout."
}
except Exception as e:
return {"status": "error", "error": str(e)}
Pattern 4: Batch Operations
Tools that process multiple items should define batch limits:
{
"name": "send_emails",
"description": "Send emails to multiple recipients",
"parameters": {
"type": "object",
"properties": {
"recipients": {
"type": "array",
"items": {
"type": "object",
"properties": {
"email": {
"type": "string",
"pattern": "^[^@]+@[^@]+$"
},
"name": {
"type": "string"
}
},
"required": ["email"]
},
"minItems": 1,
"maxItems": 100,
"description": "Recipients list (1-100 items). Each has email and optional name."
},
"subject": {
"type": "string",
"minLength": 1,
"maxLength": 200,
"description": "Email subject (1-200 chars)"
},
"body": {
"type": "string",
"minLength": 1,
"maxLength": 10000,
"description": "Email body (1-10000 chars)"
},
"dry_run": {
"type": "boolean",
"default": false,
"description": "If true, validate without sending (useful for testing)"
}
},
"required": ["recipients", "subject", "body"]
}
}
Implementation with batch safety:
def send_emails(recipients: list[dict], subject: str, body: str,
dry_run: bool = False) -> dict:
"""Send emails in batch."""
# Validate recipients
if len(recipients) > 100:
return {
"status": "error",
"error": "Too many recipients (max 100)"
}
if dry_run:
# Validate without sending
valid = []
invalid = []
for i, recipient in enumerate(recipients):
if "@" in recipient.get("email", ""):
valid.append(i)
else:
invalid.append(i)
return {
"status": "validation",
"valid_count": len(valid),
"invalid_count": len(invalid),
"invalid_indices": invalid,
"message": f"Would send to {len(valid)} recipients. Fix {len(invalid)} invalid emails."
}
# Send for real
sent = 0
failed = []
for i, recipient in enumerate(recipients):
try:
send_email(
to=recipient["email"],
name=recipient.get("name", ""),
subject=subject,
body=body
)
sent += 1
except Exception as e:
failed.append({"index": i, "error": str(e)})
return {
"status": "success",
"sent": sent,
"failed_count": len(failed),
"failed": failed[:10] # Only return first 10 failures
}
Pattern 5: State Machine Operations
Tools that manage state (workflows, approvals) should define valid state transitions:
{
"name": "update_ticket_state",
"description": "Update a ticket's state with validation of allowed transitions",
"parameters": {
"type": "object",
"properties": {
"ticket_id": {
"type": "string",
"pattern": "^TICKET-\\d{4,}$",
"description": "Ticket ID (e.g., TICKET-1234)"
},
"new_state": {
"type": "string",
"enum": ["open", "in_progress", "blocked", "closed"],
"description": "New state (open, in_progress, blocked, or closed)"
},
"comment": {
"type": "string",
"maxLength": 500,
"description": "Optional comment explaining the state change"
}
},
"required": ["ticket_id", "new_state"]
}
}
Implementation with state validation:
VALID_TRANSITIONS = {
"open": ["in_progress", "closed"],
"in_progress": ["blocked", "closed", "open"],
"blocked": ["in_progress", "open"],
"closed": ["open"]
}
def update_ticket_state(ticket_id: str, new_state: str,
comment: str = "") -> dict:
"""Update ticket state with transition validation."""
# Fetch current ticket
ticket = fetch_ticket(ticket_id)
if not ticket:
return {"status": "error", "error": f"Ticket {ticket_id} not found"}
current_state = ticket["state"]
# Validate transition
if new_state not in VALID_TRANSITIONS.get(current_state, []):
allowed = VALID_TRANSITIONS.get(current_state, [])
return {
"status": "error",
"error": f"Cannot transition from {current_state} to {new_state}",
"allowed": allowed
}
# Update
ticket["state"] = new_state
if comment:
ticket["history"].append({
"timestamp": datetime.now().isoformat(),
"from_state": current_state,
"to_state": new_state,
"comment": comment
})
save_ticket(ticket)
return {
"status": "success",
"ticket_id": ticket_id,
"old_state": current_state,
"new_state": new_state
}
Pattern 6: Conditional Parameters
Some parameters are required only if others have specific values:
{
"name": "deploy",
"description": "Deploy a service with environment-specific configuration",
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"enum": ["api", "web", "worker"],
"description": "Service to deploy"
},
"environment": {
"type": "string",
"enum": ["dev", "staging", "production"],
"description": "Target environment"
},
"auto_scale": {
"type": "boolean",
"description": "Enable auto-scaling. Required for production."
},
"min_instances": {
"type": "integer",
"minimum": 1,
"description": "Min instances. Required if auto_scale is true."
},
"max_instances": {
"type": "integer",
"minimum": 1,
"description": "Max instances. Required if auto_scale is true."
},
"approval": {
"type": "string",
"description": "Approval ticket ID (e.g., APPROVAL-123). Required for production."
}
},
"required": ["service", "environment"]
}
}
Implementation with conditional validation:
def deploy(service: str, environment: str, auto_scale: bool = False,
min_instances: int = None, max_instances: int = None,
approval: str = None) -> dict:
"""Deploy with conditional parameter validation."""
errors = []
# Production always requires approval
if environment == "production" and not approval:
errors.append("production deployments require an approval ticket ID")
# Auto-scaling requires min/max instances
if auto_scale:
if min_instances is None:
errors.append("auto_scale requires min_instances")
if max_instances is None:
errors.append("auto_scale requires max_instances")
if min_instances and max_instances and min_instances > max_instances:
errors.append("min_instances must be <= max_instances")
if errors:
return {
"status": "error",
"errors": errors
}
# Perform deployment
result = perform_deployment(service, environment, {
"auto_scale": auto_scale,
"min_instances": min_instances,
"max_instances": max_instances,
"approval": approval
})
return {
"status": "success",
"result": result
}
Pattern 7: Result Streaming for Long Operations
For operations that take time, stream results back as they arrive:
from typing import Iterator
def stream_results(query: str) -> Iterator[dict]:
"""Stream results as they arrive."""
for i, result in enumerate(process_query(query)):
yield {
"status": "streaming",
"chunk_index": i,
"result": result
}
yield {
"status": "complete",
"total_results": i + 1
}
The schema describes the base operation, and the implementation can stream {"status": "streaming", "chunk": ...} events while the LLM waits.
Key Takeaways
- Pagination: Include page, page_size, and has_next for large result sets.
- File operations: Use pattern validation and path sanitization to prevent attacks.
- Time-bounded tasks: Enforce timeouts and return clear timeout errors.
- Batch operations: Set reasonable batch limits (e.g., max 100 items).
- State machines: Define valid state transitions in the response and reject invalid ones.
- Conditional parameters: Document interdependencies clearly; validate both the schema and business logic.
Frequently Asked Questions
Should I put business logic validation in the schema or in the code?
Both. Schema validates type/format (e.g., email pattern). Code validates business logic (e.g., state transitions, limits). Keep separation clear.
What's the max batch size I should allow?
Depends on your system. Common ranges: 10–100 for I/O-bound (emails, API calls), 1–10 for CPU-bound (image processing). Test with your LLM to find balance between throughput and error rate.
How do I prevent the LLM from calling tools in the wrong order (e.g., update before create)?
Document it in your system prompt. You can also have tools return errors if preconditions aren't met (e.g., "Cannot update ticket that doesn't exist"). The LLM learns from errors.
Should I implement idempotency for tools?
Yes, when possible. If an LLM retries a tool call, the second call should succeed without side effects. Use request IDs (deduplication) or safe updates (last-write-wins).
How do I handle tools that conflict (e.g., two users deleting the same resource simultaneously)?
Include optimistic locking: version numbers, timestamps, or ETags. Return a conflict error if the resource was modified. Let the LLM retry with the current version.