Skip to main content

Building a Continuous LLM Evaluation Harness

Evaluation doesn't stop at CI/CD gates. In production, your model runs on real queries, and you need continuous monitoring to catch distribution shift, feedback loops, and silent failures. A continuous evaluation harness is a system that (1) samples production traffic, (2) evaluates it against quality metrics, (3) detects anomalies, (4) collects human feedback, and (5) retrains or alerts. The difference between a system that degrades silently over months and one that alerts in minutes is a well-designed evaluation harness.

This article teaches you to build production evaluation systems: real-time metrics collection, alerting on anomalies, feedback collection loops, and model versioning for continuous improvement.

Continuous Evaluation Architecture

A production evaluation harness looks like this:

Production Queries

Sample (e.g., 1% for cost efficiency)

Deterministic Checks + Fast Metrics

Metrics Database (Time-Series Store)

Anomaly Detection (e.g., EWMA, isolation forest)

Alert if metric < threshold

Human Feedback Collection (optional)

Golden Dataset Update (if pattern emerges)

Model Retraining Pipeline (weekly/monthly)

The system runs 24/7, ingests production data, and triggers alerts or retraining automatically.

Real-Time Metrics Collection

Capture quality signals as users interact with your system. Use event logging and time-series storage (e.g., Prometheus, InfluxDB, CloudWatch).

import time
import json
from datetime import datetime
from typing import Dict

class ProductionEvaluationLogger:
"""Log evaluation metrics from production queries."""

def __init__(self, metrics_endpoint: str = 'http://localhost:9090/metrics'):
self.metrics_endpoint = metrics_endpoint
self.buffer = []

def log_query_evaluation(
self,
query_id: str,
query_text: str,
model_output: str,
timestamp: float = None,
metadata: dict = None
) -> Dict:
"""
Evaluate a production query and log metrics.
Called immediately after serving a query to a user.
"""
if timestamp is None:
timestamp = time.time()

metrics = {}

# Deterministic checks (fast)
metrics['format_valid'] = validate_output_format(model_output)
metrics['no_pii'] = not contains_pii(model_output)
metrics['length'] = len(model_output.split())

# Optional: async semantic similarity (if cheap embedding available)
if metadata and 'reference' in metadata:
metrics['semantic_sim'] = compute_semantic_similarity_cached(
model_output,
metadata['reference']
)

event = {
'query_id': query_id,
'timestamp': timestamp,
'model_version': metadata.get('model_version', 'unknown') if metadata else 'unknown',
'query_length': len(query_text.split()),
'output_length': metrics['length'],
'metrics': metrics
}

# Buffer for batch writing (reduce I/O)
self.buffer.append(event)

if len(self.buffer) >= 100: # Write every 100 events
self._flush_buffer()

return event

def _flush_buffer(self):
"""Write buffered events to time-series database."""
if not self.buffer:
return

# Example: write to InfluxDB or Prometheus
for event in self.buffer:
self._write_to_backend(event)

self.buffer = []

def _write_to_backend(self, event: Dict):
"""Write individual event to metrics backend."""
# In production: use Python client for InfluxDB, Prometheus, etc.
# Format: metric_name{labels} value timestamp
# Example:
# eval_format_valid{query_id="abc", model="v2"} 1.0 1714574400
pass

# Usage in your serving code
evaluator = ProductionEvaluationLogger()

def serve_query(user_query: str):
"""Your model serving endpoint."""
model_output = generate_response(user_query)

# Log evaluation
evaluator.log_query_evaluation(
query_id=generate_uuid(),
query_text=user_query,
model_output=model_output,
metadata={'model_version': 'v2.1', 'reference': None}
)

return model_output

Log continuously but sample to reduce costs. At 1% sampling, 1 million production queries = 10,000 evaluated (hours of GPU compute vs. seconds).

Anomaly Detection on Metrics

Monitor metrics over time; alert when they deviate from baseline.

import numpy as np
from scipy import stats

class AnomalyDetector:
"""Detect anomalies in evaluation metrics using multiple methods."""

def __init__(
self,
window_size: int = 100,
sensitivity: float = 2.0 # std devs above baseline
):
self.window_size = window_size
self.sensitivity = sensitivity
self.baseline_mean = None
self.baseline_std = None

def set_baseline(self, historical_values: list):
"""Calibrate baseline on historical data (e.g., last week)."""
self.baseline_mean = np.mean(historical_values)
self.baseline_std = np.std(historical_values)

def detect_zscore_anomaly(self, value: float) -> Dict:
"""
Z-score method: flag if value deviates >N std devs from baseline.
"""
if self.baseline_mean is None:
return {'is_anomaly': False, 'reason': 'no baseline'}

z_score = (value - self.baseline_mean) / (self.baseline_std + 1e-6)
is_anomaly = abs(z_score) > self.sensitivity

return {
'is_anomaly': is_anomaly,
'z_score': z_score,
'threshold': self.sensitivity
}

def detect_ewma_anomaly(self, value: float, alpha: float = 0.3) -> Dict:
"""
Exponential weighted moving average: flag if value deviates from trend.
Good for gradual drift detection.
"""
if self.baseline_mean is None:
return {'is_anomaly': False, 'reason': 'no baseline'}

# Simplified EWMA: current value vs. exponential weighted historical mean
ewma = alpha * value + (1 - alpha) * self.baseline_mean
deviation = abs(value - ewma)

is_anomaly = deviation > self.sensitivity * self.baseline_std

return {
'is_anomaly': is_anomaly,
'ewma': ewma,
'deviation': deviation
}

def detect_isolation_forest(
self,
recent_values: list,
contamination: float = 0.05
) -> Dict:
"""
Isolation forest: unsupervised anomaly detection.
Good for multi-dimensional metrics (semantic_sim + length + toxicity).
"""
from sklearn.ensemble import IsolationForest

if len(recent_values) < 10:
return {'is_anomaly': False, 'reason': 'insufficient data'}

X = np.array(recent_values).reshape(-1, 1)

clf = IsolationForest(contamination=contamination)
predictions = clf.fit_predict(X)

latest_is_anomaly = predictions[-1] == -1

return {
'is_anomaly': latest_is_anomaly,
'anomaly_score': clf.score_samples(X)[-1]
}

# Usage: monitor semantic similarity in production
detector = AnomalyDetector(sensitivity=2.0)

# Calibrate on last week's data
detector.set_baseline(historical_semantic_sims[-7*24*60:]) # Last 7 days

# Real-time: check each query
def check_metric_anomaly(metric_value: float):
z_score_result = detector.detect_zscore_anomaly(metric_value)

if z_score_result['is_anomaly']:
send_alert(
f"ANOMALY: semantic_sim dropped to {metric_value:.3f} "
f"(z={z_score_result['z_score']:.1f})"
)

Anomaly detection catches distribution shift automatically. If your model starts outputting shorter answers or lower-quality text, alerts fire within minutes.

Human Feedback Integration

Collect feedback from users and use it to identify misclassified examples and model failures.

class FeedbackCollector:
"""Collect and aggregate user feedback on model outputs."""

def __init__(self, database_path: str = 'feedback.db'):
self.database_path = database_path

def log_feedback(
self,
query_id: str,
user_feedback: str,
rating: int = None,
improvement: str = None
) -> Dict:
"""
User gives feedback: "good", "bad", or "could be better".
Optional: rating on 1–5 scale, or free-form improvement suggestion.
"""
feedback_record = {
'query_id': query_id,
'feedback': user_feedback, # 'positive', 'negative', 'neutral'
'rating': rating,
'improvement': improvement,
'timestamp': time.time()
}

# Store in database
self._store_feedback(feedback_record)

# Immediately check if threshold exceeded
negative_pct = self._get_recent_negative_pct()

if negative_pct > 0.15: # >15% negative feedback
send_alert(f"High negative feedback rate: {negative_pct:.1%}")

return feedback_record

def identify_feedback_patterns(self) -> Dict:
"""
Aggregate feedback to find patterns: are failures concentrated in
specific query types, domains, or model versions?
"""
all_feedback = self._load_feedback(days=7)

patterns = {
'total_feedback': len(all_feedback),
'negative_pct': sum(1 for f in all_feedback if f['feedback'] == 'negative') / len(all_feedback),
'by_model_version': {},
'by_domain': {}
}

# Group by model version
for version in set(f.get('model_version') for f in all_feedback):
version_feedback = [f for f in all_feedback if f.get('model_version') == version]
negative = sum(1 for f in version_feedback if f['feedback'] == 'negative')
patterns['by_model_version'][version] = {
'negative_pct': negative / len(version_feedback)
}

return patterns

def _store_feedback(self, record: Dict):
"""Store in database (simplified)."""
# In production: use SQLite, PostgreSQL, or document store
pass

def _load_feedback(self, days: int = 7) -> list:
"""Load recent feedback."""
# In production: query database with time filter
pass

def _get_recent_negative_pct(self) -> float:
"""Get % negative in last hour."""
pass

Feedback closes the loop: users tell you what's wrong, you fix it, and retraining improves the model. Over time, this creates a virtuous cycle.

Model Versioning and A/B Testing

Track model versions and use A/B testing to compare production performance.

class ModelVersionController:
"""Manage model versions and traffic split."""

def __init__(self):
self.models = {}
self.traffic_split = {} # e.g., {'v1': 0.5, 'v2': 0.5}

def register_model(self, version: str, model_path: str):
"""Register a new model version for serving."""
self.models[version] = load_model(model_path)

def set_traffic_split(self, split: Dict[str, float]):
"""Control % of traffic going to each version."""
assert sum(split.values()) == 1.0
self.traffic_split = split

def select_model(self) -> str:
"""Choose model version based on traffic split."""
import random

rand = random.random()
cumulative = 0

for version, pct in self.traffic_split.items():
cumulative += pct
if rand <= cumulative:
return version

def evaluate_version(
self,
version: str,
metric_name: str,
days: int = 7
) -> Dict:
"""Get performance metrics for a specific model version."""
metrics_data = query_metrics_database(
version=version,
metric=metric_name,
days=days
)

return {
'version': version,
'mean': np.mean(metrics_data),
'std': np.std(metrics_data),
'n': len(metrics_data)
}

# Usage: gradual rollout
controller = ModelVersionController()
controller.register_model('v1', 'models/v1.bin')
controller.register_model('v2', 'models/v2.bin')

# Start: 90% v1, 10% v2
controller.set_traffic_split({'v1': 0.9, 'v2': 0.1})

# Monitor for 3 days...
# If v2 metrics are good, ramp up
v2_eval = controller.evaluate_version('v2', 'semantic_sim', days=3)
if v2_eval['mean'] >= v1_baseline:
controller.set_traffic_split({'v1': 0.5, 'v2': 0.5}) # 50/50

Gradual rollouts reduce risk: if a new model is worse, only 10% of users see degraded quality, and you can revert instantly.

Alerting and Runbook Automation

When metrics exceed thresholds, alert immediately and trigger automated responses.

class EvaluationAlertSystem:
"""Alert on metric anomalies and trigger remediation."""

def check_and_alert(self, metric_name: str, value: float):
"""
Check metric against thresholds; alert and remediate if needed.
"""
thresholds = {
'semantic_sim': {'lower': 0.65, 'upper': 1.0},
'toxicity': {'lower': 0.0, 'upper': 0.1},
'latency_ms': {'lower': 0.0, 'upper': 2000},
}

if metric_name not in thresholds:
return

lower, upper = thresholds[metric_name]['lower'], thresholds[metric_name]['upper']

if value < lower:
self._alert_and_remediate(
severity='high',
metric=metric_name,
value=value,
threshold=lower,
direction='below'
)
elif value > upper:
self._alert_and_remediate(
severity='medium',
metric=metric_name,
value=value,
threshold=upper,
direction='above'
)

def _alert_and_remediate(
self,
severity: str,
metric: str,
value: float,
threshold: float,
direction: str
):
"""Send alert and execute automated runbook."""

alert_message = (
f"[{severity.upper()}] {metric} {direction} threshold "
f"(value={value:.3f}, threshold={threshold:.3f})"
)

# Alert humans
send_slack_alert(alert_message)
send_pagerduty_incident(alert_message)

# Automated remediation
if metric == 'semantic_sim' and direction == 'below':
# Semantic quality dropped: revert to previous model version
self._rollback_to_stable_version()

elif metric == 'toxicity' and direction == 'above':
# Toxicity increased: enable safety filter, alert security team
self._enable_safety_filter()
send_security_alert("Toxicity spike detected; safety filter enabled")

elif metric == 'latency_ms' and direction == 'above':
# Latency spiked: scale out
scale_out_instances(target=double_current_capacity())

Automated remediation reduces MTTR (mean time to recovery). When something goes wrong, the system responds immediately while you're still waking up.

Key Takeaways

  • Continuous evaluation runs 24/7: Sample production queries, evaluate in real-time, detect anomalies.
  • Anomaly detection catches drift automatically: Z-score, EWMA, and isolation forest catch quality degradation before users complain.
  • Feedback closes the loop: User feedback identifies failures; patterns guide retraining.
  • Gradual rollouts reduce risk: A/B test new models on 10% of traffic before full deployment.
  • Automated alerting and remediation: Respond in minutes, not hours or days.

Frequently Asked Questions

How much traffic should I sample for evaluation?

0.5–5% is typical: 0.5% if evaluation is expensive or sensitive (PII), 5% if cheap and non-invasive. Balance cost vs. signal: too little (0.1%) and anomalies hide; too much (>10%) and costs explode.

What metrics should I monitor continuously?

Start with: semantic similarity, output length, toxicity, format validity. Add domain-specific metrics: code generation accuracy, retrieval quality, latency. Pick 3–5 key metrics; more = noise.

How do I handle feedback lag (users rate queries hours later)?

Join feedback with metrics at evaluation time. If feedback arrives late, store in a feedback database and regenerate dataset labels periodically. Use feedback as a separate signal for retraining, not live metrics.

Should I retrain on all user feedback?

No. Feedback is noisy: one user's "bad" is another's "acceptable". Aggregate feedback over time. When a clear pattern emerges (70%+ negative on a class), investigate and retrain if warranted.

What's a reasonable alert threshold?

Start conservative: alert at 2–3 std devs above baseline (0.2–0.5% false positive rate). After 2 weeks, review alert accuracy. Adjust thresholds to minimize alert fatigue while catching real issues.

Further Reading