Measuring Reliability: Tracking LLM Output Success Rates
You can't improve what you don't measure. Many teams deploy validation systems but never track whether they're working. Without metrics, you can't distinguish whether a 90% success rate is good or whether you're leaving money on the table. This article teaches you how to measure reliability: from basic success counts to percentile dashboards to alert thresholds.
In 2026, leading teams track 10–15 reliability metrics per LLM task (Anthropic, 2025). This visibility drives continual improvement.
Core Reliability Metrics
Define a core set of metrics:
Success rate: Percentage of outputs that pass validation on first attempt.
success_rate = (valid_outputs / total_outputs) * 100
If you generate 1,000 outputs and 850 pass schema validation, your baseline success rate is 85%.
Fix rate: Percentage of failed outputs that are fixed by repair/fallback.
fix_rate = (repaired_outputs / failed_outputs) * 100
If 150 outputs failed initially and 120 were fixed by error feedback loops, your fix rate is 80%.
End-to-end success rate: Percentage of outputs that are eventually valid (after all retries/fallbacks).
e2e_success = (final_valid / total) * 100
This is what matters most: of all your outputs, how many eventually become valid? If 150 failed initially and 120 were fixed, your e2e success is 97%.
Implementing Reliability Tracking
Build a metrics collector:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
@dataclass
class ValidationMetrics:
"""Track validation metrics for an extraction task."""
task_id: str
timestamp: datetime
input_length: int
output_length: int
first_attempt_valid: bool
num_retries: int
final_valid: bool
fallback_used: Optional[str] # e.g., "cache", "secondary_llm", "template"
processing_time_ms: float
model_used: str
error_type: Optional[str] # e.g., "missing_field", "wrong_type"
class MetricsCollector:
"""Collect and aggregate reliability metrics."""
def __init__(self):
self.metrics: list[ValidationMetrics] = []
def record(self, metrics: ValidationMetrics) -> None:
"""Record a validation attempt."""
self.metrics.append(metrics)
def success_rate(self) -> float:
"""Calculate first-attempt success rate."""
if not self.metrics:
return 0.0
successes = sum(1 for m in self.metrics if m.first_attempt_valid)
return (successes / len(self.metrics)) * 100
def fix_rate(self) -> float:
"""Calculate repair success rate."""
failed = [m for m in self.metrics if not m.first_attempt_valid]
if not failed:
return 100.0
fixed = sum(1 for m in failed if m.final_valid)
return (fixed / len(failed)) * 100
def e2e_success_rate(self) -> float:
"""Calculate end-to-end success rate."""
if not self.metrics:
return 0.0
successes = sum(1 for m in self.metrics if m.final_valid)
return (successes / len(self.metrics)) * 100
def error_distribution(self) -> dict[str, int]:
"""Count errors by type."""
distribution = {}
for m in self.metrics:
if m.error_type:
distribution[m.error_type] = distribution.get(m.error_type, 0) + 1
return distribution
def avg_retry_count(self) -> float:
"""Average number of retries per failed extraction."""
failed = [m for m in self.metrics if not m.first_attempt_valid]
if not failed:
return 0.0
return sum(m.num_retries for m in failed) / len(failed)
def fallback_breakdown(self) -> dict[str, int]:
"""Count fallback usage by type."""
breakdown = {}
for m in self.metrics:
if m.fallback_used:
breakdown[m.fallback_used] = breakdown.get(m.fallback_used, 0) + 1
return breakdown
# Usage
collector = MetricsCollector()
# Track an extraction attempt
metrics = ValidationMetrics(
task_id="customer_extraction_001",
timestamp=datetime.now(),
input_length=256,
output_length=128,
first_attempt_valid=False,
num_retries=2,
final_valid=True,
fallback_used=None,
processing_time_ms=1250,
model_used="claude-3-5-sonnet-20241022",
error_type="missing_field"
)
collector.record(metrics)
print(f"Success rate: {collector.success_rate():.1f}%")
print(f"Fix rate: {collector.fix_rate():.1f}%")
print(f"E2E success: {collector.e2e_success_rate():.1f}%")
Per-Model and Per-Task Metrics
Track metrics separately by model and task:
from collections import defaultdict
class DetailedMetricsCollector:
"""Track metrics by model and task."""
def __init__(self):
self.by_model = defaultdict(list)
self.by_task = defaultdict(list)
self.all_metrics = []
def record(self, metrics: ValidationMetrics) -> None:
"""Record metrics and organize by model/task."""
self.all_metrics.append(metrics)
self.by_model[metrics.model_used].append(metrics)
self.by_task[metrics.task_id].append(metrics)
def success_rate_by_model(self) -> dict[str, float]:
"""Success rate for each model."""
rates = {}
for model, metrics_list in self.by_model.items():
if not metrics_list:
continue
successes = sum(1 for m in metrics_list if m.first_attempt_valid)
rates[model] = (successes / len(metrics_list)) * 100
return rates
def success_rate_by_task(self) -> dict[str, float]:
"""Success rate for each task."""
rates = {}
for task, metrics_list in self.by_task.items():
if not metrics_list:
continue
successes = sum(1 for m in metrics_list if m.first_attempt_valid)
rates[task] = (successes / len(metrics_list)) * 100
return rates
# Usage
detailed = DetailedMetricsCollector()
for model in ["claude-3-5-sonnet-20241022", "claude-3-opus-20250219"]:
for i in range(100):
metrics = ValidationMetrics(
task_id="sentiment_analysis",
timestamp=datetime.now(),
input_length=256,
output_length=64,
first_attempt_valid=(i % 15 > 0), # 85% success
num_retries=1 if (i % 15 == 0) else 0,
final_valid=True,
fallback_used=None,
processing_time_ms=750,
model_used=model,
error_type=None
)
detailed.record(metrics)
print("Success by model:")
for model, rate in detailed.success_rate_by_model().items():
print(f" {model}: {rate:.1f}%")
Alert Thresholds and Anomaly Detection
Set alerts when metrics degrade:
from typing import Callable
class MetricsAlert:
"""Alert when metrics fall below thresholds."""
def __init__(
self,
success_rate_threshold: float = 80.0,
e2e_success_threshold: float = 95.0,
error_spike_threshold: float = 2.0 # 2x increase
):
self.success_rate_threshold = success_rate_threshold
self.e2e_success_threshold = e2e_success_threshold
self.error_spike_threshold = error_spike_threshold
self.baseline_errors = {}
def check_degradation(
self,
collector: MetricsCollector,
on_alert: Callable[[str], None]
) -> None:
"""Check if metrics have degraded."""
success_rate = collector.success_rate()
if success_rate < self.success_rate_threshold:
on_alert(f"Success rate degraded: {success_rate:.1f}% < {self.success_rate_threshold}%")
e2e_success = collector.e2e_success_rate()
if e2e_success < self.e2e_success_threshold:
on_alert(f"E2E success degraded: {e2e_success:.1f}% < {self.e2e_success_threshold}%")
# Error spike detection
current_errors = collector.error_distribution()
for error_type, count in current_errors.items():
baseline = self.baseline_errors.get(error_type, 0)
if baseline > 0 and count > baseline * self.error_spike_threshold:
on_alert(f"Error spike: {error_type} ({count} vs baseline {baseline})")
# Update baseline
self.baseline_errors = current_errors
# Usage
alert = MetricsAlert(success_rate_threshold=85.0)
def send_alert(message: str):
print(f"ALERT: {message}")
# Send to Slack, PagerDuty, etc.
# Check metrics every hour
alert.check_degradation(collector, send_alert)
Dashboard and Visualization
Export metrics for dashboarding:
import json
def export_metrics_for_dashboard(collector: MetricsCollector) -> dict:
"""Export metrics as JSON for dashboard."""
return {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_outputs": len(collector.metrics),
"first_attempt_success_rate": collector.success_rate(),
"fix_rate": collector.fix_rate(),
"e2e_success_rate": collector.e2e_success_rate(),
"avg_retries": collector.avg_retry_count()
},
"errors": collector.error_distribution(),
"fallbacks": collector.fallback_breakdown(),
"percentiles": {
"p50_time_ms": percentile(collector.metrics, 50, lambda m: m.processing_time_ms),
"p95_time_ms": percentile(collector.metrics, 95, lambda m: m.processing_time_ms),
"p99_time_ms": percentile(collector.metrics, 99, lambda m: m.processing_time_ms)
}
}
def percentile(data: list, p: int, accessor: Callable) -> float:
"""Calculate percentile of metric."""
values = sorted([accessor(m) for m in data])
idx = int(len(values) * (p / 100))
return values[idx] if idx < len(values) else values[-1]
# Export and log
dashboard_data = export_metrics_for_dashboard(collector)
print(json.dumps(dashboard_data, indent=2))
Key Takeaways
- Track three core metrics: first-attempt success rate, fix rate, and end-to-end success rate.
- Separate metrics by model and task to identify weak spots.
- Set alert thresholds (typically 85% first-attempt, 95% e2e) to catch degradation.
- Log error types and fallback usage to drive improvements.
- Export metrics for real-time dashboarding and historical analysis.
Frequently Asked Questions
What's a good first-attempt success rate?
50–70% is typical with basic prompting. 80%+ is good. 90%+ with excellent schema-in-prompt and guardrails. Target 95%+ e2e after repairs/fallbacks.
How often should I check metrics?
For production systems, check hourly or per-batch. Log daily/weekly summaries. Alert on real-time degradation.
Should I track latency as a reliability metric?
Yes, but separately. A 95% success rate that takes 5 seconds per output is worse than 80% success in 500ms. Track latency percentiles (p50, p95, p99) alongside success rates.
How do I know if a metric change is significant?
Use statistical significance testing. A change from 85% to 87% in 100 samples is noise. A change from 85% to 87% in 10,000 samples is significant. Consider sample size.