Skip to main content

Streaming Agent Outputs in Real-Time (2026)

When an agent runs for 10 seconds before returning a final result, users see a blank screen. Streaming agent outputs in real-time—displaying reasoning steps, partial results, and tokens as they're generated—creates a responsive, engaging experience. This article covers token streaming, event-driven architectures, and implementing real-time agent outputs via WebSockets and Server-Sent Events (SSE).

Why Stream Agent Outputs

An agent researching a topic calls a search tool, waits for results, reasons over them, and returns a synthesis. Without streaming, the user waits 10+ seconds for the final answer. With streaming:

  • Tokens appear as the LLM generates them (feels fast even if total latency is the same).
  • Tool Execution is visible ("searching for AI safety frameworks...").
  • Partial Results accumulate (user sees progress, not a spinner).
  • Cost Awareness (streaming stops early if the user interrupts).

Streaming is especially valuable for longer agent runs (20-30 seconds), where perceived latency matters more than actual latency.

Token Streaming with LLMs

Modern LLM APIs support streaming. Each token arrives in real-time, allowing you to display it incrementally.

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-20241022")

prompt = "Research AI safety and provide a brief summary."

# Token streaming: each token arrives as it's generated
print("Streaming response:")
for chunk in model.stream(prompt):
# chunk is a delta (partial token)
print(chunk.content, end="", flush=True)

print("\n")

The stream() method yields chunks (partial tokens) in real-time. Each chunk can be displayed immediately, creating a typewriter effect.

LangGraph Streaming

LangGraph's stream() method emits state updates and node transitions, not just tokens. This lets you track agent progress at the node level.

from langgraph.graph import StateGraph

graph = StateGraph(AgentState)
graph.add_node("research", research_node)
graph.add_node("analyze", analyze_node)
graph.add_node("synthesize", synthesize_node)
graph.add_edge("research", "analyze")
graph.add_edge("analyze", "synthesize")
graph.set_entry_point("research")
graph.set_finish_point("synthesize")

compiled_graph = graph.compile()

# Stream node events
print("Agent progress:")
for event in compiled_graph.stream({"query": "AI safety", "messages": []}):
print(f"Event: {event}")

# Output example:
# Event: ('research', {'messages': [...], 'search_results': '...'})
# Event: ('analyze', {'messages': [...], 'analysis': '...'})
# Event: ('synthesize', {'messages': [...], 'final_answer': '...'})

Each event is a tuple (node_name, updated_state). This is powerful for tracking multi-step workflows and reporting progress.

WebSocket Streaming to Frontend

For web applications, use WebSockets to push events to the client in real-time.

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import asyncio
import json

app = FastAPI()

@app.websocket("/ws/agent")
async def websocket_agent(websocket: WebSocket):
"""WebSocket endpoint for real-time agent streaming."""
await websocket.accept()

try:
# Receive the user's query
data = await websocket.receive_text()
query = json.loads(data)["query"]

# Run the agent and stream results
for event in compiled_graph.stream({"query": query, "messages": []}):
node_name, state = event

# Send the event to the client
await websocket.send_json({
"type": "progress",
"node": node_name,
"state": state
})

# Send final result
await websocket.send_json({
"type": "done",
"result": state.get("final_answer")
})

except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})

finally:
await websocket.close()

On the frontend:

const ws = new WebSocket("ws://localhost:8000/ws/agent");

ws.onmessage = (event) => {
const message = JSON.parse(event.data);

if (message.type === "progress") {
console.log(`Node: ${message.node}`);
updateUI(message.state);
} else if (message.type === "done") {
console.log("Agent finished:", message.result);
displayFinalResult(message.result);
}
};

// Send query to agent
ws.send(JSON.stringify({ query: "Research AI safety" }));

The client receives real-time updates as the agent progresses through nodes and generates output.

Server-Sent Events (SSE) Streaming

SSE is simpler than WebSockets if you only need server-to-client streaming (no bidirectional messages).

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

@app.get("/agent/stream")
def agent_stream_sse(query: str):
"""Stream agent progress via Server-Sent Events."""

async def event_generator():
for event in compiled_graph.stream({"query": query, "messages": []}):
node_name, state = event

# Format as SSE
yield f"data: {json.dumps({'node': node_name, 'state': state})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)

Frontend with SSE:

const eventSource = new EventSource(`/agent/stream?query=Research%20AI%20safety`);

eventSource.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log(`Node: ${message.node}`);
updateUI(message.state);
};

eventSource.onerror = () => {
console.error("Stream ended or errored");
eventSource.close();
};

SSE is HTTP-based and easier to deploy behind proxies than WebSockets.

Streaming LLM Tokens Within a Node

For long text generation (like synthesis), stream tokens within a single node.

from langchain_core.callbacks import StreamingStdOutCallbackHandler

def synthesize_node_streaming(state: AgentState) -> dict:
"""Generate synthesis with token streaming."""

model = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
callbacks=[StreamingStdOutCallbackHandler()] # Print tokens as they arrive
)

prompt = f"Synthesize: {state['analysis']}"
response = model.invoke(prompt)

return {"final_answer": response.content}

The callback prints each token to stdout. In a web app, yield tokens to the frontend via WebSocket or SSE.

Combining Node and Token Streaming

For maximum responsiveness, stream both node events and tokens.

@app.websocket("/ws/agent-advanced")
async def websocket_agent_advanced(websocket: WebSocket):
"""Stream both node progress and LLM tokens."""
await websocket.accept()

data = await websocket.receive_text()
query = json.loads(data)["query"]

# Custom callback to capture tokens
class StreamingCallback:
def __init__(self, ws):
self.ws = ws

async def on_llm_new_token(self, token: str):
await self.ws.send_json({"type": "token", "text": token})

callback = StreamingCallback(websocket)

# Integrate callback into model
# (This is framework-specific; check your LLM SDK)

for event in compiled_graph.stream({"query": query}):
node_name, state = event
await websocket.send_json({
"type": "node",
"node": node_name
})

Clients now see tokens appearing in real-time within each node's execution.

Backpressure and Flow Control

If the client is slow or the agent is fast, events can pile up. Implement backpressure:

@app.websocket("/ws/agent-backpressure")
async def websocket_with_backpressure(websocket: WebSocket):
await websocket.accept()

data = await websocket.receive_text()
query = json.loads(data)["query"]

for event in compiled_graph.stream({"query": query}):
await websocket.send_json({"type": "event", "data": event})

# Wait for client ack before sending next event
try:
ack = await asyncio.wait_for(websocket.receive_text(), timeout=5.0)
if ack != "ack":
break
except asyncio.TimeoutError:
# Client too slow; close connection
break

The client sends an "ack" after processing each event. This prevents buffer overflow if the client is slow.

Key Takeaways

  • Token streaming creates a responsive feel; LLM APIs support this natively via stream().
  • Node streaming tracks multi-step agent progress; LangGraph's stream() emits node events.
  • WebSockets enable bidirectional real-time communication; ideal for interactive agents.
  • SSE is simpler for one-way server-to-client streaming; easier to deploy.
  • Backpressure prevents buffer overflow on slow clients; implement acks or rate limiting.

Frequently Asked Questions

Does streaming cost more in tokens?

No. Streaming is just the delivery mechanism. The same tokens are generated whether you stream or batch. Total cost is identical.

Can I cancel a streamed agent mid-execution?

Yes, close the WebSocket or cancel the SSE stream. The agent loop should check for cancellation between steps.

What if a node takes a long time without generating tokens?

Display a spinner or "thinking..." message. Streaming nodes themselves helps; if the user sees no progress, consider breaking the node into smaller steps.

Should I stream intermediate agent state or just final results?

Stream everything if the user cares. For a research agent, show search results, analysis, and synthesis. For a simple calculator, just the final result. Match the UX expectation.

How do I handle errors in a streaming context?

Send an error event: {"type": "error", "message": "..."}. The client closes the stream. Ensure the agent is in a resumable state (checkpoint) if you want to retry.

Further Reading