Designing LLM Steps for AI Automation Workflows
An LLM step is a workflow node that calls a large language model (e.g., GPT-4o, Claude, Llama) with a prompt built from context variables, then writes the LLM response back to context for downstream nodes to consume. The challenge is designing LLM steps that are reproducible, efficient, and aligned with the workflow's intent—not just slotting an LLM call into a generic "call API" node.
In this article, you will learn to design LLM steps with parameterized prompts, handle streaming output, manage token budgets, and implement structured outputs so that LLM responses integrate cleanly with the rest of your workflow.
What Makes an LLM Step Different?
An LLM step differs from a generic API call step in several ways:
- Prompts are context-aware: they embed values from context (customer name, support ticket body, etc.).
- Streaming matters: for user-facing workflows, streaming LLM output to a UI provides real-time feedback.
- Unpredictability: LLMs are stochastic; the same prompt may produce different responses. Workflows must handle variance.
- Token budgets: LLM calls are expensive; exceeding a token limit or context window causes hard failures.
- Structured output: workflows often need the LLM response parsed into a specific format (decision, entities, sentiment score).
Prompt Templates and Variable Substitution
A prompt template is a string with placeholders for context variables. When the LLM step executes, it substitutes values from context and sends the rendered prompt to the LLM.
from jinja2 import Template
from anthropic import Anthropic
import json
class LLMStep:
"""
An LLM workflow step.
Renders a Jinja2 prompt template with context variables and calls an LLM.
"""
def __init__(self, name: str, model: str, prompt_template: str):
self.name = name
self.model = model
self.template = Template(prompt_template)
self.client = Anthropic()
def execute(self, context: dict) -> dict:
"""
Execute the LLM step.
Args:
context: Workflow context dictionary.
Returns:
Updated context with LLM response.
"""
# 1. Render the prompt template with context variables.
try:
prompt = self.template.render(**context)
except Exception as e:
return {
**context,
f"{self.name}_error": f"Template rendering failed: {e}",
f"{self.name}_success": False,
}
# 2. Call the LLM.
try:
message = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[
{
"role": "user",
"content": prompt,
}
],
)
response_text = message.content[0].text
except Exception as e:
return {
**context,
f"{self.name}_error": f"LLM call failed: {e}",
f"{self.name}_success": False,
}
# 3. Write the response back to context.
return {
**context,
f"{self.name}_response": response_text,
f"{self.name}_tokens_used": message.usage.input_tokens + message.usage.output_tokens,
f"{self.name}_success": True,
}
# Example: Analyze a support ticket
ticket_analyzer = LLMStep(
name="analyze_ticket",
model="claude-3-5-sonnet-20241022",
prompt_template="""
Analyze this support ticket and extract key information.
Ticket ID: {{ ticket_id }}
Customer Email: {{ customer_email }}
Subject: {{ subject }}
Body: {{ body }}
Respond with a JSON object containing:
- summary: A one-sentence summary of the issue
- priority: One of ['low', 'medium', 'high', 'critical']
- category: One of ['billing', 'technical', 'feature_request', 'other']
- requires_escalation: A boolean
Return ONLY the JSON object, no other text.
""",
)
# Simulate a workflow context.
context = {
"ticket_id": "TKT-12345",
"customer_email": "[email protected]",
"subject": "API is returning 500 errors",
"body": "Started this morning in the us-west-2 region. Other regions seem fine.",
}
result = ticket_analyzer.execute(context)
print(result["analyze_ticket_response"])
print(result["analyze_ticket_success"])
Key patterns:
- Jinja2 templates: use
{{ variable }}for substitution,{% if condition %} ... {% endif %}for conditionals. - Error handling: catch template rendering errors and LLM API errors separately, and write both to context.
- Token tracking: include token counts in context so downstream nodes can track cumulative usage.
- Structured output: request JSON output and plan to parse it in a downstream step.
System Messages and Few-Shot Examples
For complex reasoning, include a system message that sets tone and adds few-shot examples:
class AdvancedLLMStep:
"""LLM step with system message and few-shot examples."""
def __init__(self, name: str, model: str, system_message: str):
self.name = name
self.model = model
self.system_message = system_message
self.client = Anthropic()
def execute(self, context: dict, few_shot_examples: list = None) -> dict:
"""
Execute with system message and optional few-shot examples.
Args:
context: Workflow context.
few_shot_examples: List of {"input": "...", "output": "..."} dicts.
Returns:
Updated context.
"""
# Build the user message with few-shot examples.
user_message = f"""Classify this customer email.
Customer email: {context.get('email_body', '')}
Examples:
"""
if few_shot_examples:
for example in few_shot_examples:
user_message += f"\nInput: {example['input']}\nOutput: {example['output']}\n"
user_message += "\nClassification (one of: complaint, praise, neutral):"
try:
message = self.client.messages.create(
model=self.model,
max_tokens=256,
system=self.system_message,
messages=[
{"role": "user", "content": user_message}
],
)
classification = message.content[0].text.strip()
except Exception as e:
return {
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
}
return {
**context,
f"{self.name}_classification": classification,
f"{self.name}_success": True,
}
# Example system message for sentiment classification.
system_msg = """You are an expert customer support triage agent.
Classify customer emails into sentiment categories.
Be concise; respond with only the classification label."""
classifier = AdvancedLLMStep(
name="classify_sentiment",
model="claude-3-5-sonnet-20241022",
system_message=system_msg,
)
few_shots = [
{
"input": "Your service is amazing!",
"output": "praise",
},
{
"input": "The API is down and has been for hours.",
"output": "complaint",
},
]
context = {"email_body": "I would like to upgrade my plan."}
result = classifier.execute(context, few_shot_examples=few_shots)
print(result["classify_sentiment_classification"])
Handling Token Limits and Context Windows
Modern LLMs have token limits (context window). If your context is large, you must truncate or summarize before sending to the LLM:
import tiktoken
class TokenBudgetedLLMStep:
"""LLM step that respects token budgets."""
def __init__(self, name: str, model: str, max_input_tokens: int = 4000):
self.name = name
self.model = model
self.max_input_tokens = max_input_tokens
self.client = Anthropic()
self.tokenizer = tiktoken.encoding_for_model(model)
def execute(self, context: dict) -> dict:
"""
Execute, truncating the input if it exceeds the token budget.
"""
# Build the prompt from context.
prompt = f"""
Summarize the following support ticket history.
History:
{context.get('ticket_history', '')}
Summary:
"""
# Count tokens in the prompt.
tokens = self.tokenizer.encode(prompt)
if len(tokens) > self.max_input_tokens:
# Truncate the history to fit the budget.
max_history_tokens = self.max_input_tokens - len(self.tokenizer.encode(prompt.split('History:')[0]))
history_tokens = self.tokenizer.encode(context.get('ticket_history', ''))
if len(history_tokens) > max_history_tokens:
truncated_history = self.tokenizer.decode(history_tokens[:max_history_tokens])
prompt = f"""
Summarize the following support ticket history.
History:
{truncated_history}
[... truncated due to token limit ...]
Summary:
"""
try:
message = self.client.messages.create(
model=self.model,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
summary = message.content[0].text
except Exception as e:
return {
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
}
return {
**context,
f"{self.name}_summary": summary,
f"{self.name}_success": True,
}
Key patterns:
- Count tokens before sending: use the model's tokenizer to estimate cost.
- Truncate gracefully: if the prompt is too large, truncate the least-important parts (e.g., oldest history).
- Warn downstream steps: include a
_tokens_usedor_truncatedflag in context so later steps know the data was lossy.
Streaming LLM Responses to the UI
For user-facing workflows (e.g., a workflow that generates a report in real time), stream the LLM output:
import asyncio
from typing import AsyncGenerator
class StreamingLLMStep:
"""LLM step that streams output."""
def __init__(self, name: str, model: str):
self.name = name
self.model = model
self.client = Anthropic()
async def execute_streaming(
self, context: dict
) -> AsyncGenerator[str, None]:
"""
Execute and stream the response chunk by chunk.
Yields:
Text chunks as they arrive from the LLM.
"""
prompt = f"Write a report for: {context.get('topic', '')}"
# Use streaming API.
full_response = ""
with self.client.messages.stream(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
full_response += text
yield text
# After streaming completes, update context.
# (This is done by the caller, which handles the streaming generator.)
context[f"{self.name}_response"] = full_response
context[f"{self.name}_success"] = True
# In a FastAPI handler:
from fastapi.responses import StreamingResponse
@app.post("/workflows/report/stream")
async def stream_workflow_report(topic: str):
"""
Endpoint that streams an LLM-generated report.
"""
step = StreamingLLMStep(
name="generate_report",
model="claude-3-5-sonnet-20241022",
)
context = {"topic": topic}
async def response_generator():
"""Wrapper that streams from the LLM step."""
async for chunk in step.execute_streaming(context):
yield chunk
return StreamingResponse(
response_generator(),
media_type="text/plain",
)
Key Takeaways
- LLM steps render Jinja2 templates with context variables, call an LLM, and write responses back to context.
- Use system messages to set tone and few-shot examples to guide behavior.
- Track token usage to manage costs and detect anomalies.
- Implement token budgets and gracefully truncate long inputs to fit context windows.
- For user-facing workflows, stream LLM output to provide real-time feedback.
- Always handle LLM errors gracefully—write error messages to context and set a success flag.
Frequently Asked Questions
How do I ensure reproducible LLM outputs in workflows?
Set temperature=0 for deterministic outputs, but understand that even with temperature=0, some variance is possible. For workflows requiring perfect reproducibility, log the exact prompt and response for auditing, but accept that stochasticity is inherent to LLMs. Use caching (available in newer Claude models) to avoid recomputing identical prompts.
What if the LLM response is not valid JSON?
Plan for this. After the LLM step completes, add a downstream validation step that checks if the response is valid JSON. If not, either (a) retry with a stricter prompt (e.g., "Return ONLY valid JSON, no other text"), or (b) escalate to a human for manual correction.
Can I call multiple LLMs in a single workflow?
Yes. Each LLM step can use a different model. This is useful for cost optimization (use a faster/cheaper model for simple steps, a larger model for complex reasoning) or for multi-stage reasoning (e.g., analyze intent with one model, generate response with another).
How do I prevent a runaway LLM step from consuming my entire token budget?
Set a max_tokens limit on each LLM step and track cumulative usage in context. Add a pre-execution check: if cumulative_tokens_used + estimated_this_step > budget, halt and raise an error. This prevents surprises.
What happens if an LLM step produces output that breaks downstream steps?
This is why structured outputs matter. Request JSON from the LLM and validate it in a downstream step before using it. If validation fails, retry the LLM step with a corrected prompt or escalate to a human.