Skip to main content

Monitoring and Debugging Concurrent LLM Systems

Observability is the hidden infrastructure that separates chaos from control in scaled LLM systems. With thousands of concurrent requests flowing through multiple machines, queues, and providers, you cannot debug by intuition. You need structured logging, distributed tracing, and real-time metrics that answer: "Why did my latency spike? Which provider is failing? What's the queue depth? How many requests failed?" This article covers the three pillars of observability—metrics, logs, traces—and patterns to implement them in LLM systems.

The Three Pillars of Observability

PillarData TypeToolQuestion
MetricsTime-series (counters, gauges, histograms)Prometheus, Grafana"Is latency spiking? Is throughput dropping?"
LogsStructured text eventsELK, Loki, Datadog"What happened to request XYZ?"
TracesRequest path across servicesJaeger, DataDog, Zipkin"Why was request slow? Where did latency come from?"

A production LLM system needs all three. Metrics detect problems at macro level. Logs provide incident context. Traces pinpoint bottlenecks.

Metrics: Export and Alert on Performance

Prometheus metrics are the heartbeat of any distributed system. Export metrics from your LLM application:

from prometheus_client import Counter, Gauge, Histogram
import time

# Define metrics
llm_requests_total = Counter(
'llm_requests_total',
'Total LLM requests',
['provider', 'model', 'status'], # Labels
)

llm_request_duration_seconds = Histogram(
'llm_request_duration_seconds',
'Request latency in seconds',
['provider', 'model'],
buckets=[0.1, 0.5, 1, 2, 5, 10], # Response time buckets
)

llm_request_tokens = Histogram(
'llm_request_tokens',
'Tokens per request',
['provider', 'model', 'token_type'], # input vs output
)

queue_depth = Gauge(
'llm_queue_depth',
'Pending requests in queue',
['queue_name'],
)

llm_cost_usd = Counter(
'llm_cost_usd_total',
'Total cost in USD',
['provider', 'model'],
)


async def fetch_llm_with_metrics(prompt: str, provider: str, model: str) -> str:
"""Fetch LLM response and record metrics."""
start = time.time()
status = "success"

try:
response = await fetch_llm_response(prompt, provider, model)

# Record metrics
duration = time.time() - start
llm_request_duration_seconds.labels(provider=provider, model=model).observe(duration)
llm_requests_total.labels(provider=provider, model=model, status=status).inc()

# Token counts (get from API response)
tokens_input = len(prompt.split()) # Rough estimate
tokens_output = len(response.split())

llm_request_tokens.labels(
provider=provider,
model=model,
token_type="input"
).observe(tokens_input)

llm_request_tokens.labels(
provider=provider,
model=model,
token_type="output"
).observe(tokens_output)

# Cost
cost = calculate_cost(provider, tokens_input, tokens_output)
llm_cost_usd.labels(provider=provider, model=model).inc(cost)

return response

except Exception as e:
status = "error"
llm_requests_total.labels(provider=provider, model=model, status=status).inc()
raise


# Expose metrics endpoint for Prometheus to scrape
from prometheus_client import start_http_server

start_http_server(8000) # Prometheus scrapes http://localhost:8000/metrics

Then configure Prometheus to scrape your service:

# prometheus.yml
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'llm-worker'
static_configs:
- targets: ['localhost:8000']

Alert on critical metrics:

# alerts.yml
groups:
- name: llm_alerts
interval: 30s
rules:
- alert: HighLatency
expr: histogram_quantile(0.99, llm_request_duration_seconds) > 5
for: 5m
annotations:
summary: "P99 latency exceeded 5 seconds"

- alert: HighErrorRate
expr: rate(llm_requests_total{status="error"}[5m]) > 0.05
for: 2m
annotations:
summary: "Error rate exceeded 5%"

- alert: QueueDepthHigh
expr: llm_queue_depth > 1000
for: 2m
annotations:
summary: "Queue depth exceeded 1000 items"

Structured Logging for Incident Response

Structured logs enable fast incident analysis. Use JSON-formatted logs with context:

import logging
import json
from contextvars import ContextVar

# Context variable to track request ID across async tasks
request_id_context = ContextVar('request_id', default=None)

class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing."""

def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"request_id": request_id_context.get(),
}

# Add exception traceback if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)

# Add custom fields (provider, model, etc.) if present
if hasattr(record, "provider"):
log_data["provider"] = record.provider
if hasattr(record, "model"):
log_data["model"] = record.model
if hasattr(record, "duration_ms"):
log_data["duration_ms"] = record.duration_ms

return json.dumps(log_data)


# Configure logger
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())

logger = logging.getLogger("llm_service")
logger.addHandler(handler)
logger.setLevel(logging.INFO)


async def fetch_llm_with_logging(
request_id: str,
prompt: str,
provider: str,
model: str,
) -> str:
"""Fetch LLM response with structured logging."""
request_id_context.set(request_id)

start = time.time()

try:
response = await fetch_llm_response(prompt, provider, model)

duration_ms = (time.time() - start) * 1000

logger.info(
"LLM request succeeded",
extra={
"provider": provider,
"model": model,
"duration_ms": duration_ms,
}
)

return response

except Exception as e:
duration_ms = (time.time() - start) * 1000

logger.error(
f"LLM request failed: {str(e)}",
exc_info=True,
extra={
"provider": provider,
"model": model,
"duration_ms": duration_ms,
}
)
raise

Parse logs with Loki (Grafana's log aggregation):

# Query: show all failed requests in last hour, grouped by provider
{job="llm-worker"} | json | level="ERROR" | group by (provider) | count()

Distributed Tracing for Request Debugging

Tracing shows the complete path of a request through your system, with timing at each stage:

from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
import asyncio

# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)

tracer = trace.get_tracer(__name__)

# Instrument async functions
AsyncioInstrumentor().instrument()


async def process_with_tracing(prompt: str, provider: str) -> str:
"""Process request with distributed tracing."""

with tracer.start_as_current_span("process_request") as span:
span.set_attribute("prompt_length", len(prompt))
span.set_attribute("provider", provider)

# Stage 1: tokenize
with tracer.start_as_current_span("tokenize"):
tokens = tokenize(prompt)
span.set_attribute("token_count", len(tokens))

# Stage 2: queue
with tracer.start_as_current_span("queue_wait"):
await queue.put(tokens)

# Stage 3: LLM call
with tracer.start_as_current_span("llm_inference"):
result = await fetch_llm_response(prompt, provider)

# Stage 4: post-processing
with tracer.start_as_current_span("postprocess"):
processed = postprocess(result)

span.set_attribute("result_length", len(processed))

return processed

View the trace in Jaeger UI (http://localhost:16686):

process_request (2000 ms total)
├── tokenize (50 ms)
├── queue_wait (150 ms)
├── llm_inference (1600 ms) <- bottleneck!
└── postprocess (10 ms)

From this trace, you immediately see that LLM inference is the bottleneck. Investigate provider latency next.

Log Aggregation and Analysis

Ship logs to a centralized system for analysis:

# fluent-bit.conf (log shipper)
[INPUT]
Name tail
Path /var/log/llm/*.log
Parser json

[OUTPUT]
Name es
Match *
Host elasticsearch
Port 9200
Index llm-logs

Query aggregated logs:

# Elasticsearch query: P99 latency by provider
GET llm-logs*/_search
{
"aggs": {
"by_provider": {
"terms": {"field": "provider"},
"aggs": {
"p99_latency": {
"percentiles": {"field": "duration_ms", "percents": [99]}
}
}
}
}
}

Result shows which providers are consistently slow.

Dashboard Template for LLM Systems

Create a Grafana dashboard with key metrics:

{
"dashboard": {
"title": "LLM System Observability",
"panels": [
{
"title": "Request Rate (req/sec)",
"targets": [
{
"expr": "rate(llm_requests_total[5m])"
}
]
},
{
"title": "P50 / P99 Latency",
"targets": [
{
"expr": "histogram_quantile(0.5, llm_request_duration_seconds)"
},
{
"expr": "histogram_quantile(0.99, llm_request_duration_seconds)"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(llm_requests_total{status=\"error\"}[5m]) / rate(llm_requests_total[5m])"
}
]
},
{
"title": "Queue Depth",
"targets": [
{
"expr": "llm_queue_depth"
}
]
},
{
"title": "Cost by Provider",
"targets": [
{
"expr": "increase(llm_cost_usd_total[1h])"
}
]
}
]
}
}

Debugging Patterns for Concurrent Systems

Pattern 1: Request Tracing with ID Propagation

Always propagate request IDs through async task chains:

import uuid
from contextvars import ContextVar

request_id_var = ContextVar('request_id', default=None)

async def handle_user_request(prompt: str) -> str:
"""HTTP handler."""
request_id = str(uuid.uuid4())
request_id_var.set(request_id)

try:
result = await process_llm(prompt)
return result
except Exception as e:
logger.error(f"Request {request_id} failed", exc_info=True)
raise


async def process_llm(prompt: str) -> str:
"""Background processing. Inherits request_id from context."""
request_id = request_id_var.get()

# Queue processing (context propagated automatically)
await queue.put({"request_id": request_id, "prompt": prompt})

# Retrieve from queue later (context lost; must pass request_id explicitly)
item = await queue.get()
logger.info(f"Processing request {item['request_id']}")

Pattern 2: Latency Attribution

Break down latency by stage to find bottlenecks:

async def process_with_latency_breakdown(prompt: str) -> dict:
"""Track latency at each pipeline stage."""

stages = {}

# Stage 1: tokenization
start = time.time()
tokens = tokenize(prompt)
stages["tokenization"] = (time.time() - start) * 1000

# Stage 2: queue wait
start = time.time()
await queue.put(tokens)
stages["queue_wait"] = (time.time() - start) * 1000

# Stage 3: LLM inference
start = time.time()
response = await fetch_llm_response(prompt)
stages["inference"] = (time.time() - start) * 1000

# Stage 4: post-processing
start = time.time()
result = postprocess(response)
stages["postprocessing"] = (time.time() - start) * 1000

total = sum(stages.values())

# Log breakdown
logger.info(
"Latency breakdown",
extra={
**stages,
"total_ms": total,
}
)

return {"result": result, "latency_breakdown": stages}

Pattern 3: Health Checks for Dependencies

Regularly check provider health:

async def health_check_loop():
"""Periodically verify all providers are up."""

while True:
health = {}

for provider in [openai, anthropic, google]:
try:
start = time.time()
await provider.health_check()
latency = (time.time() - start) * 1000
health[provider.name] = {
"status": "healthy",
"latency_ms": latency,
}
except Exception as e:
health[provider.name] = {
"status": "unhealthy",
"error": str(e),
}

# Publish to metrics
for provider_name, status in health.items():
provider_health_gauge.labels(provider=provider_name).set(
1 if status["status"] == "healthy" else 0
)

logger.info("Health check", extra={"health": health})

await asyncio.sleep(60) # Check every minute

Key Takeaways

  • Metrics detect problems at macro level: Export latency, throughput, error rate, costs. Alert on thresholds.
  • Structured logs enable fast incident response: JSON logs with request ID let you trace any issue.
  • Distributed tracing pinpoints bottlenecks: See request path through system; identify slow stage immediately.
  • Dashboard aggregates key metrics: Single view of system health; no digging through logs.
  • Propagate request IDs through async chains: Track any request end-to-end for debugging.

Frequently Asked Questions

What metrics should I export first?

Start with: request count, latency (P50, P99), error rate, queue depth. Add cost tracking. Alert on latency > 5s and error rate > 5%.

How often should Prometheus scrape?

15–30 seconds is standard. Faster (5 seconds) for critical applications; slower (60 seconds) for less critical. Trade-off: more data = more storage/cost.

Should I log every request?

No: sample 10% in production (1 in 10 requests). Full logging on errors. Use sampling to reduce log volume 90% while maintaining visibility into issues.

How do I debug a slow LLM provider?

(1) Check latency histogram by provider. (2) Look at traces for that provider. (3) Check provider's status page. (4) Run health check on their endpoint. (5) If slow, fail over to backup provider.

What's the cost of observability infrastructure?

Prometheus/Grafana: ~$50–200/month on cloud. ELK: ~$200–1000/month depending on log volume. Jaeger: ~$100–500/month. Budget 10–20% of app cost for observability.

Further Reading