Skip to main content

Monitoring and Observability for AI Workflows

When a workflow fails in production, you need to understand exactly what happened: which step failed, what was the input, what was the error, and how long did it take? Observability—the ability to infer internal state from external outputs—is critical. This article covers structured logging, distributed tracing, metrics collection, and practical debugging techniques for workflows.

Structured Logging: The Foundation

Structured logging captures every significant event (step execution, errors, decisions) as a machine-readable JSON log. Each log entry contains context (execution ID, step name), timing, and the outcome.

import json
import logging
from datetime import datetime
from typing import Any, Dict

class WorkflowLogger:
"""
Structured logger for workflow events.
"""

def __init__(self, name: str = "workflow"):
self.logger = logging.getLogger(name)
handler = logging.StreamHandler()

# Use JSON formatter for machine readability.
formatter = logging.Formatter(
'%(message)s', # We will format as JSON ourselves
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def log_step_start(
self,
execution_id: str,
step_name: str,
step_type: str,
context: Dict[str, Any],
):
"""Log the start of a step execution."""
event = {
"event": "step_start",
"execution_id": execution_id,
"step_name": step_name,
"step_type": step_type,
"timestamp": datetime.utcnow().isoformat(),
"context_keys": list(context.keys()),
}
self.logger.info(json.dumps(event))

def log_step_complete(
self,
execution_id: str,
step_name: str,
duration_ms: float,
success: bool,
error: str = None,
):
"""Log the completion of a step."""
event = {
"event": "step_complete",
"execution_id": execution_id,
"step_name": step_name,
"duration_ms": duration_ms,
"success": success,
"error": error,
"timestamp": datetime.utcnow().isoformat(),
}
self.logger.info(json.dumps(event))

def log_branch_decision(
self,
execution_id: str,
branch_name: str,
path_taken: str,
):
"""Log a branch decision."""
event = {
"event": "branch_decision",
"execution_id": execution_id,
"branch_name": branch_name,
"path_taken": path_taken,
"timestamp": datetime.utcnow().isoformat(),
}
self.logger.info(json.dumps(event))

def log_tool_call(
self,
execution_id: str,
tool_name: str,
arguments: Dict[str, Any],
response: Dict[str, Any],
duration_ms: float,
):
"""Log an external tool invocation."""
event = {
"event": "tool_call",
"execution_id": execution_id,
"tool_name": tool_name,
"arguments": arguments,
"response_keys": list(response.keys()) if isinstance(response, dict) else None,
"duration_ms": duration_ms,
"timestamp": datetime.utcnow().isoformat(),
}
self.logger.info(json.dumps(event))

def log_error(
self,
execution_id: str,
step_name: str,
error: str,
traceback_str: str = None,
):
"""Log an error."""
event = {
"event": "error",
"execution_id": execution_id,
"step_name": step_name,
"error": error,
"traceback": traceback_str,
"timestamp": datetime.utcnow().isoformat(),
}
self.logger.info(json.dumps(event))

# Usage:
logger = WorkflowLogger("support_workflow")

async def execute_workflow_with_logging(execution_id: str, context: dict):
"""Example workflow with comprehensive logging."""
logger.log_step_start(execution_id, "fetch_ticket", "tool", context)

import time
start_time = time.time()

try:
# Execute step.
result = await fetch_ticket_tool.invoke({"ticket_id": context["ticket_id"]})
duration_ms = (time.time() - start_time) * 1000

logger.log_tool_call(
execution_id,
"fetch_ticket",
{"ticket_id": context["ticket_id"]},
result,
duration_ms,
)

# Next step: branch.
logger.log_step_start(execution_id, "route_ticket", "branch", context)
priority = context.get("priority", "unknown")
path = "escalate" if priority == "high" else "auto_reply"
logger.log_branch_decision(execution_id, "route_ticket", path)

except Exception as e:
import traceback
logger.log_error(
execution_id,
"fetch_ticket",
str(e),
traceback.format_exc(),
)

Distributed Tracing

Distributed tracing tracks a request (workflow execution) through multiple services and nodes. Each span represents a unit of work (LLM call, tool invocation). OpenTelemetry is the standard:

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Configure Jaeger exporter.
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)

resource = Resource.create({SERVICE_NAME: "workflow-engine"})
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(trace_provider)

tracer = trace.get_tracer(__name__)

async def execute_step_with_tracing(
execution_id: str,
step_name: str,
step_func,
):
"""Execute a step with distributed tracing."""
with tracer.start_as_current_span(step_name) as span:
span.set_attribute("execution_id", execution_id)
span.set_attribute("step_type", "llm")

import time
start = time.time()

try:
result = await step_func()
span.set_attribute("duration_ms", (time.time() - start) * 1000)
span.set_attribute("success", True)
return result
except Exception as e:
span.set_attribute("success", False)
span.set_attribute("error", str(e))
raise

# Usage:
with tracer.start_as_current_span("workflow_execution") as root_span:
root_span.set_attribute("execution_id", "EXEC-123")
root_span.set_attribute("workflow_name", "support_ticket")

# Each step is a child span.
result = await execute_step_with_tracing(
"EXEC-123",
"fetch_ticket",
lambda: fetch_ticket_tool.invoke({"ticket_id": "TKT-999"}),
)

Metrics: Throughput, Latency, Errors

Metrics are aggregated measurements: throughput (executions/min), latency (p50/p95/p99 duration), error rate. Use Prometheus for collection:

from prometheus_client import Counter, Histogram, Gauge

# Define metrics.
workflow_executions_total = Counter(
"workflow_executions_total",
"Total workflow executions",
["workflow_name", "status"], # Labels: success/failure
)

workflow_duration_seconds = Histogram(
"workflow_duration_seconds",
"Workflow execution duration",
["workflow_name"],
buckets=(0.1, 0.5, 1, 5, 10, 60),
)

step_duration_seconds = Histogram(
"step_duration_seconds",
"Step execution duration",
["step_name", "step_type"],
buckets=(0.01, 0.1, 0.5, 1, 5, 10),
)

active_workflows = Gauge(
"active_workflows",
"Number of active workflow executions",
)

# Usage:
async def execute_workflow_with_metrics(execution_id: str, workflow_name: str):
"""Execute workflow and record metrics."""
import time
active_workflows.inc()
start_time = time.time()

try:
# ... execute workflow ...
workflow_executions_total.labels(
workflow_name=workflow_name,
status="success",
).inc()
except Exception as e:
workflow_executions_total.labels(
workflow_name=workflow_name,
status="failure",
).inc()
raise
finally:
duration = time.time() - start_time
workflow_duration_seconds.labels(workflow_name=workflow_name).observe(duration)
active_workflows.dec()

Dashboard: Monitoring Live Workflows

A dashboard displays key metrics and allows drill-down into individual executions:

MetricAlert ThresholdAction
Error Rate> 5%Page on-call, investigate root cause
P95 Latency> 60sIdentify slow steps, check external services
Active Executions> 1000Check for backlog, scale workers
Queue Depth> 10000Tasks accumulating; potential bottleneck

Debugging Workflows: Replay and Step-Through

When a workflow fails, replay it with detailed logging:

async def replay_workflow(execution_id: str, original_context: dict):
"""
Replay a failed workflow with identical inputs.
Useful for debugging non-deterministic failures.
"""
logger = WorkflowLogger(f"replay-{execution_id}")

print(f"Replaying execution {execution_id}")
print(f"Original context: {original_context}")

# Re-execute with the same context.
try:
result = await execute_workflow(
execution_id=f"{execution_id}-replay",
context=original_context,
)

print(f"Replay succeeded; result differs from original: {result != original_context}")
return result
except Exception as e:
print(f"Replay failed with same error or different: {e}")
raise

# Usage:
original_execution = await db.get_execution("EXEC-123")
replayed_execution = await replay_workflow("EXEC-123", original_execution.context)

Health Checks and Alerting

Expose a health check endpoint and configure alerts:

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/health")
async def health_check():
"""Liveness and readiness check."""
# Check if the workflow engine can execute a step.
try:
# Simple test: can we connect to the task queue?
# await task_queue.ping()

# Check if key dependencies are healthy.
# last_error_time = await db.get_latest_error_time()
# if datetime.utcnow() - last_error_time < timedelta(seconds=10):
# return {"status": "degraded", "message": "High error rate"}

return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
}
except Exception as e:
raise HTTPException(
status_code=503,
detail=f"Unhealthy: {e}",
)

@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics."""
# from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
# return generate_latest()
pass

Key Takeaways

  • Structured logging captures every significant event (step start/end, branch decision, error) as JSON.
  • Distributed tracing (OpenTelemetry) tracks a workflow execution through all its steps and external calls.
  • Metrics (throughput, latency, error rate) identify performance issues and anomalies.
  • Dashboards visualize key metrics; set up alerts on error rate, latency, and queue depth.
  • Replay workflows with identical inputs to debug non-deterministic failures.
  • Expose health checks and metrics endpoints for monitoring and alerting systems.

Frequently Asked Questions

How much should I log? Won't it slow down workflows?

Log at the INFO level: step start/end, branching decisions, errors. Log at DEBUG level: full context snapshots, LLM prompts (only in non-production). Asynchronous logging (batch to a queue) minimizes performance impact.

What is the difference between logging and tracing?

Logging records individual events (step start, error); tracing shows the path a request takes through the system (parent-child relationship between spans). Use both: logs for details, traces for flow.

How long should I retain logs?

Retain detailed logs (full context, LLM prompts) for 7–30 days. Aggregate metrics (throughput, latency) for 1+ year. Compliance requirements (healthcare, finance) may mandate longer retention.

What if a trace is too large (thousands of steps)?

Use sampling: trace only 10% of executions, or 100% of errors. Trade off detail for cost. In high-volume systems, always sample non-error executions.

How do I correlate logs from multiple services?

Use a shared execution_id (also called correlation ID or request ID). Pass it through all service calls and include it in every log line. This allows you to follow a workflow through multiple services.

Further Reading