Skip to main content

Rollback automation and incident recovery

Rollback automation is the ability to revert a deployment to a previous version within seconds when metrics indicate a problem, without manual intervention. An automated rollback system monitors key metrics (error rate, latency, quality score) in real-time and triggers a rollback if metrics exceed predefined thresholds. Rollback is fast because the previous version is already running (blue-green strategy) or cached; reverting is a configuration change, not a rebuild. Incident recovery is the process of detecting an issue, executing the rollback, and communicating the resolution to users and the team. With automation, a bad deployment is contained and reverted before most users notice a problem.

Detecting Deployment Failures

Effective rollback automation depends on rapid failure detection. Monitor three categories of metrics:

  1. System Metrics: error rate, HTTP 5xx responses, exception count, service unavailability.
  2. Performance Metrics: latency (p50, p99), throughput (requests per second), resource usage (CPU, memory, GPU).
  3. Quality Metrics: accuracy or relevance score (compared to baseline), hallucination rate, toxicity rate.
import time
import threading
from dataclasses import dataclass

@dataclass
class MetricThresholds:
error_rate: float = 0.05 # 5% error rate
latency_p99_ms: float = 3000 # 3s p99 latency
quality_score: float = 0.75 # minimum quality
toxic_output_rate: float = 0.01 # 1% toxic outputs
deployment_cooldown_minutes: int = 5 # avoid thrashing

class AnomalyDetector:
def __init__(self, thresholds: MetricThresholds):
self.thresholds = thresholds
self.last_rollback_time = 0

def detect_anomaly(self, current_metrics: dict) -> tuple[bool, list]:
"""Check if current metrics indicate a problem."""
anomalies = []

if current_metrics.get("error_rate", 0) > self.thresholds.error_rate:
anomalies.append(
f"error_rate {current_metrics['error_rate']:.1%} > {self.thresholds.error_rate:.1%}"
)

if current_metrics.get("latency_p99_ms", 0) > self.thresholds.latency_p99_ms:
anomalies.append(
f"latency_p99 {current_metrics['latency_p99_ms']}ms > {self.thresholds.latency_p99_ms}ms"
)

if current_metrics.get("quality_score", 1.0) < self.thresholds.quality_score:
anomalies.append(
f"quality_score {current_metrics['quality_score']:.2f} < {self.thresholds.quality_score:.2f}"
)

if current_metrics.get("toxic_output_rate", 0) > self.thresholds.toxic_output_rate:
anomalies.append(
f"toxic_rate {current_metrics['toxic_output_rate']:.2%} > {self.thresholds.toxic_output_rate:.2%}"
)

# Avoid thrashing: only rollback if cooldown has passed since last rollback
time_since_rollback = (time.time() - self.last_rollback_time) / 60
if time_since_rollback < self.thresholds.deployment_cooldown_minutes:
return False, [] # in cooldown period

return len(anomalies) >= 2, anomalies # require 2+ anomalies to trigger rollback

Automated Rollback Execution

When anomalies are detected, execute the rollback automatically. For blue-green deployments, this means switching the load balancer to the previous version. For containerized systems (Kubernetes), revert the image to the previous tag.

import subprocess
import json
from datetime import datetime

class AutomaticRollbackExecutor:
def __init__(self, deployment_name: str, namespace: str = "production"):
self.deployment_name = deployment_name
self.namespace = namespace

def execute_rollback(self, reason: str) -> bool:
"""Execute rollback to previous version."""
try:
# Get current deployment
current_image = self._get_current_image()
previous_image = self._get_previous_image()

if not previous_image:
print("ERROR: No previous image found. Rollback aborted.")
return False

# Update deployment to previous image
print(f"ROLLBACK: {current_image}{previous_image}")
print(f"Reason: {reason}")

cmd = f"""
kubectl set image deployment/{self.deployment_name} \
{self.deployment_name}={previous_image} \
-n {self.namespace} \
--record="true" \
--change-cause="Automatic rollback: {reason}"
"""

result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print(f"ERROR: Rollback command failed: {result.stderr}")
return False

# Wait for rollout to complete
wait_cmd = f"kubectl rollout status deployment/{self.deployment_name} -n {self.namespace} --timeout=5m"
subprocess.run(wait_cmd, shell=True, check=True)

# Log the incident
self._log_incident(current_image, previous_image, reason)

# Send alert to on-call team
self._send_incident_alert(current_image, previous_image, reason)

return True

except Exception as e:
print(f"CRITICAL: Rollback execution failed: {e}")
self._send_critical_alert(f"Rollback failed: {e}")
return False

def _get_current_image(self) -> str:
"""Get current image running in deployment."""
cmd = f"""
kubectl get deployment {self.deployment_name} -n {self.namespace} \
-o jsonpath='{{.spec.template.spec.containers[0].image}}'
"""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()

def _get_previous_image(self) -> str:
"""Get previous image from rollout history."""
cmd = f"""
kubectl rollout history deployment/{self.deployment_name} -n {self.namespace} \
--output=jsonpath='{{.items[*].spec.template.spec.containers[0].image}}' | \
awk '{{print $(NF-1)}}'
"""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()

def _log_incident(self, from_image: str, to_image: str, reason: str):
"""Log incident to audit system."""
incident = {
"timestamp": datetime.utcnow().isoformat(),
"type": "rollback",
"deployment": self.deployment_name,
"from_image": from_image,
"to_image": to_image,
"reason": reason
}
print(f"INCIDENT_LOG: {json.dumps(incident)}")

def _send_incident_alert(self, from_image: str, to_image: str, reason: str):
"""Send alert to on-call team (Slack, PagerDuty, etc.)."""
message = (
f"Automatic rollback triggered:\n"
f"Deployment: {self.deployment_name}\n"
f"From: {from_image}\n"
f"To: {to_image}\n"
f"Reason: {reason}"
)
# Pseudo-code: slack_client.send_message(channel="incidents", text=message)
print(f"ALERT: {message}")

def _send_critical_alert(self, error: str):
"""Send critical alert if rollback itself fails."""
# Pseudo-code: pagerduty.trigger(severity="critical", message=error)
print(f"CRITICAL_ALERT: {error}")

Monitoring Loop: Continuous Anomaly Detection

Run a monitoring loop that continuously polls metrics and executes rollbacks when needed.

import time
import threading

class MonitoringService:
def __init__(self, detector: AnomalyDetector, executor: AutomaticRollbackExecutor, interval_seconds: int = 30):
self.detector = detector
self.executor = executor
self.interval = interval_seconds
self.running = False

def start(self):
"""Start monitoring thread."""
self.running = True
thread = threading.Thread(target=self._monitor_loop, daemon=True)
thread.start()

def stop(self):
self.running = False

def _monitor_loop(self):
"""Continuous monitoring loop."""
while self.running:
try:
# Fetch current metrics
metrics = self._fetch_metrics()

# Check for anomalies
is_anomaly, anomalies = self.detector.detect_anomaly(metrics)

if is_anomaly:
reason = "; ".join(anomalies)
print(f"ANOMALY DETECTED: {reason}")

# Execute rollback
success = self.executor.execute_rollback(reason)
if success:
print("Rollback completed successfully")
# Update cooldown timer
self.detector.last_rollback_time = time.time()
else:
print("ERROR: Rollback failed; manual intervention required")
break # stop monitoring to avoid retry loop

# Wait before next check
time.sleep(self.interval)

except Exception as e:
print(f"ERROR in monitoring loop: {e}")
time.sleep(self.interval)

def _fetch_metrics(self) -> dict:
"""Fetch current metrics from observability system."""
# Pseudo-code: query Prometheus, Datadog, or similar
return {
"error_rate": 0.03,
"latency_p99_ms": 1200,
"quality_score": 0.88,
"toxic_output_rate": 0.005
}

# Usage
detector = AnomalyDetector(MetricThresholds(error_rate=0.05, latency_p99_ms=3000))
executor = AutomaticRollbackExecutor("llm-api", namespace="production")
monitor = MonitoringService(detector, executor, interval_seconds=30)
monitor.start()

Incident Communication

When a rollback happens, communicate the status to users and the team. Document what happened, why the rollback was triggered, and what was done.

def post_incident_notification(incident_id: str, deployment: str, from_version: str, to_version: str, reason: str, status: str = "resolved"):
"""Post incident notification to status page and team channels."""

# Post to status page (e.g., Statuspage.io, Atlassian StatusPage)
status_update = {
"status": status,
"title": f"{deployment} deployment incident",
"description": f"Automatic rollback triggered. Reverted from {from_version} to {to_version}. Reason: {reason}",
"component_id": deployment,
"impact": "major" if status == "investigating" else "minor"
}
# pseudo-code: statuspage_client.create_incident(status_update)

# Post to Slack
slack_message = f"""
:warning: Incident: {deployment}
Status: {status}
From: {from_version}
To: {to_version}
Reason: {reason}
"""
# pseudo-code: slack_client.send_message(channel="incidents", text=slack_message)

# Create incident ticket
ticket = {
"title": f"Incident: {deployment} rollback",
"description": reason,
"severity": "high",
"assigned_to": "on-call-team"
}
# pseudo-code: jira_client.create_issue(ticket)

Post-Incident Analysis: RCA

After a rollback, conduct a post-incident review (RCA) to understand root cause and prevent recurrence.

def post_incident_analysis(deployment: str, from_version: str, to_version: str, metrics_before: dict, metrics_after: dict):
"""Analyze incident and document root cause."""

analysis = {
"summary": f"Automatic rollback of {deployment} from {from_version} to {to_version}",
"timeline": [
{"time": "10:00", "event": "Deployment of {from_version}"},
{"time": "10:02", "event": "Error rate spike detected"},
{"time": "10:03", "event": "Automatic rollback triggered"},
{"time": "10:04", "event": "Rollback completed, metrics normalized"}
],
"root_cause": "TODO: investigate logs and code changes",
"metrics_comparison": {
"error_rate_before": metrics_before.get("error_rate"),
"error_rate_after": metrics_after.get("error_rate"),
"latency_p99_before": metrics_before.get("latency_p99_ms"),
"latency_p99_after": metrics_after.get("latency_p99_ms")
},
"preventive_actions": [
"Add stricter automated tests for this scenario",
"Increase canary duration to detect this earlier",
"Update monitoring thresholds based on findings"
],
"assigned_to": "on-call-team"
}

return analysis

Key Takeaways

  • Automated rollback detects metric anomalies (error rate, latency, quality) and reverts deployments within seconds.
  • Use blue-green or shadow deployment architectures so rollback is a fast configuration change, not a rebuild.
  • Define thresholds per metric and anomaly detection logic that avoids false positives (require 2+ metrics to fail).
  • Implement a cooldown period after rollback to avoid thrashing (rapid deploy-rollback cycles).
  • Log and communicate all rollbacks; conduct post-incident reviews to prevent recurrence.

Frequently Asked Questions

What if the previous version also has the same bug?

Add a fallback chain: if v2 fails, rollback to v1. If v1 also fails, rollback to v0. Keep at least 2 previous versions available. Monitor v1 metrics closely; if they also degrade, stop the automatic rollback loop and escalate to manual investigation.

How do I handle rollbacks for database schema changes?

Database changes require careful coordination. Use zero-downtime migration strategies: deploy new code that supports both old and new schema, migrate data in the background, then deploy code that requires the new schema. Rollback reverts both code and schema to a previous state, or use blue-green databases running in parallel.

What if I need to rollback a model fine-tuning, not a code deployment?

Model fine-tuning rollbacks are slower because they require loading a previous model checkpoint. Store model versions as immutable artifacts (in a model registry like MLflow). Rollback means switching the active model version in your config (fast) and re-deploying if needed.

Should I send alerts for every rollback or only critical ones?

Send alerts for all rollbacks, even minor ones. Log the severity (minor, major, critical) and escalate notification (Slack for minor, PagerDuty for critical). Over time, frequent rollbacks indicate instability; investigate and fix the underlying issue.

Can I test the rollback procedure without deploying a bad version?

Yes. Run a chaos engineering / resilience test: deploy a new version that deliberately triggers anomalies (injects errors, adds latency), monitor that the automatic rollback triggers, and verify the previous version is healthy afterward. This validates your rollback system before you need it for real.

Further Reading