Shipping Agents to Production: Safety and Governance
Building a working agent and deploying it to production are fundamentally different challenges. In development, you can afford agents making mistakes: you fix the code, rerun, and learn. In production, a single agent error could corrupt production data, introduce security vulnerabilities, or cause customer-facing failures. This article covers how to ship agents responsibly: approval workflows, audit logging, gradual rollout, monitoring, and rollback strategies.
The Production Threat Model
What can go wrong when agents edit production code?
- Silent corruption: Agent edits code that looks correct but breaks a subtle contract (database query changes without schema migration).
- Security vulnerabilities: Agent adds code that is vulnerable (e.g., hardcoded secrets, SQL injection).
- Cascading failures: Agent change breaks something in production; cascades to other systems.
- Data loss: Agent deletes files or overwrites important data.
- Slow degradation: Agent makes many small changes that individually seem harmless but collectively degrade system behavior.
Each requires a different control.
Strategy 1: Multi-Level Approval Workflow
Never deploy agent changes directly. Use an approval workflow:
┌──────────────────┐
│ Agent Proposes │
│ Change │
└────────┬─────────┘
│
┌────────v──────────────────┐
│ Level 1: Automated Tests │
│ - Unit tests pass? │
│ - Lint/format correct? │
│ - No dangerous patterns? │
└────────┬──────────────────┘
│ (fail: reject)
│ (pass: continue)
┌────────v──────────────────┐
│ Level 2: Code Review │
│ - Human reviews change │
│ - Approves or requests │
│ modifications │
└────────┬──────────────────┘
│ (reject: end)
│ (approve: continue)
┌────────v──────────────────┐
│ Level 3: Integration Test │
│ - Run full test suite │
│ - Staging deployment │
└────────┬──────────────────┘
│ (fail: reject)
│ (pass: continue)
┌────────v──────────────────┐
│ Level 4: Gradual Deploy │
│ - Canary (1% traffic) │
│ - Monitor metrics │
│ - Full rollout if ok │
└──────────────────────────┘
class DeploymentApprovalPipeline:
"""Multi-level approval for agent changes."""
def __init__(self):
self.levels = {
"tests": self._run_automated_tests,
"review": self._request_code_review,
"integration": self._run_integration_tests,
"canary": self._deploy_canary
}
def process_change(self, change: dict) -> dict:
"""Process a proposed agent change through the pipeline."""
change_id = change["id"]
status = {"change_id": change_id, "levels": {}}
for level_name, level_fn in self.levels.items():
result = level_fn(change)
status["levels"][level_name] = result
if not result.get("approved"):
status["blocked_at"] = level_name
status["reason"] = result.get("reason", "Unknown")
return status
# All levels passed
status["approved"] = True
status["deployed"] = True
return status
def _run_automated_tests(self, change: dict) -> dict:
"""Level 1: Run automated tests."""
# Apply change to temp repo
with tempfile.TemporaryDirectory() as tmpdir:
shutil.copytree(change["repo"], tmpdir, dirs_exist_ok=True)
# Apply the change
for edit in change["edits"]:
filepath = os.path.join(tmpdir, edit["path"])
with open(filepath) as f:
content = f.read()
new_content = content.replace(edit["old_text"], edit["new_text"])
with open(filepath, 'w') as f:
f.write(new_content)
# Run tests
result = subprocess.run(
"pytest --tb=short",
cwd=tmpdir,
capture_output=True,
timeout=60
)
if result.returncode != 0:
return {
"approved": False,
"reason": "Automated tests failed",
"output": result.stderr.decode()[:500]
}
return {
"approved": True,
"tests_passed": True
}
def _request_code_review(self, change: dict) -> dict:
"""Level 2: Request human code review."""
review_id = str(uuid.uuid4())
# Create review request
review_request = {
"id": review_id,
"change_id": change["id"],
"diff": change["diff"],
"created_at": time.time(),
"status": "pending"
}
# Store review request
self._store_review(review_request)
# In practice: send to code review tool (GitHub, Gerrit, etc.)
# For demo: approve after 10 seconds if no rejections
print(f"Review {review_id} created. Awaiting approval...")
time.sleep(10)
approval = self._get_review_status(review_id)
return {
"approved": approval.get("approved", True),
"review_id": review_id,
"reviewer": approval.get("reviewer", "auto-approved")
}
def _run_integration_tests(self, change: dict) -> dict:
"""Level 3: Full integration tests."""
# Deploy to staging
staging_url = self._deploy_to_staging(change)
# Run integration tests against staging
result = subprocess.run(
f"pytest tests/integration/ --base-url={staging_url}",
capture_output=True,
timeout=300 # 5 minutes for full suite
)
if result.returncode != 0:
return {
"approved": False,
"reason": "Integration tests failed",
"staging_url": staging_url
}
return {
"approved": True,
"staging_url": staging_url,
"integration_tests_passed": True
}
def _deploy_canary(self, change: dict) -> dict:
"""Level 4: Gradual canary deployment."""
canary_result = self._deploy_with_percentage("canary", change, 1)
if not canary_result["healthy"]:
return {
"approved": False,
"reason": "Canary deployment unhealthy"
}
time.sleep(60) # Monitor for 1 minute
full_result = self._deploy_with_percentage("production", change, 100)
return {
"approved": full_result["healthy"],
"canary_deployed": True,
"full_deployed": full_result["healthy"]
}
Strategy 2: Detailed Audit Logging
Every agent action must be logged for compliance and debugging:
class AuditLog:
"""Immutable audit trail of all agent changes."""
def __init__(self, log_file: str = "/var/log/agent-audit.jsonl"):
self.log_file = log_file
def log_change(self, event: dict):
"""Record an agent action."""
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"event_type": event.get("type"), # "change_proposed", "tested", "deployed"
"change_id": event.get("change_id"),
"agent_id": event.get("agent_id"),
"action": event.get("action"), # "edit", "delete", "create"
"filepath": event.get("filepath"),
"old_hash": event.get("old_hash"), # SHA-256 of old content
"new_hash": event.get("new_hash"),
"user": event.get("user"), # Who approved it
"status": event.get("status"), # "pending", "approved", "deployed", "rolled_back"
"metadata": event.get("metadata", {})
}
# Append to immutable log
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
# Also send to centralized logging (Datadog, Splunk, etc.)
self._send_to_logging_backend(entry)
def log_deployment(self, change_id: str, deployed_at: str, version: str):
"""Record deployment."""
self.log_change({
"type": "deployment",
"change_id": change_id,
"event": "deployed",
"deployed_at": deployed_at,
"version": version
})
def log_rollback(self, change_id: str, reason: str):
"""Record rollback."""
self.log_change({
"type": "rollback",
"change_id": change_id,
"reason": reason,
"rolled_back_at": datetime.utcnow().isoformat()
})
def log_failure(self, change_id: str, error: str, stage: str):
"""Record failure at a specific stage."""
self.log_change({
"type": "failure",
"change_id": change_id,
"stage": stage, # "testing", "review", "integration", "canary"
"error": error,
"failed_at": datetime.utcnow().isoformat()
})
Strategy 3: Monitoring and Observability
Instrument agents to detect breakage in production:
class AgentHealthMonitor:
"""Monitor agent-deployed code for regressions."""
def __init__(self, metrics_client):
self.metrics = metrics_client
self.baseline = {} # Baseline metrics (pre-deployment)
def establish_baseline(self, duration_seconds: int = 300):
"""Establish baseline metrics pre-deployment."""
print(f"Collecting baseline metrics for {duration_seconds}s...")
metrics_to_track = [
"request_latency_p99",
"error_rate",
"database_query_count",
"cache_hit_ratio"
]
for metric in metrics_to_track:
values = []
for _ in range(duration_seconds // 10):
value = self._fetch_metric(metric)
values.append(value)
time.sleep(10)
self.baseline[metric] = {
"mean": sum(values) / len(values),
"p99": sorted(values)[int(0.99 * len(values))],
"max": max(values)
}
print(f"Baseline established: {self.baseline}")
def monitor_post_deployment(self, duration_seconds: int = 300) -> dict:
"""Monitor metrics after deployment."""
print(f"Monitoring post-deployment for {duration_seconds}s...")
regressions = []
for metric, baseline in self.baseline.items():
current_value = self._fetch_metric(metric)
# Regression if metric degraded by >20%
if metric.endswith("_latency") and current_value > baseline["mean"] * 1.2:
regressions.append({
"metric": metric,
"baseline": baseline["mean"],
"current": current_value,
"degradation_percent": (current_value - baseline["mean"]) / baseline["mean"] * 100
})
elif metric == "error_rate" and current_value > baseline["mean"] * 1.5:
regressions.append({
"metric": metric,
"baseline": baseline["mean"],
"current": current_value,
"degradation_percent": (current_value - baseline["mean"]) / baseline["mean"] * 100
})
if regressions:
return {
"healthy": False,
"regressions": regressions,
"action": "rollback"
}
return {
"healthy": True,
"message": "No regressions detected"
}
def _fetch_metric(self, metric_name: str) -> float:
"""Fetch current metric value from monitoring system."""
# In practice: query Prometheus, Datadog, CloudWatch, etc.
return self.metrics.query(metric_name)
Strategy 4: Rollback Procedures
If deployment fails, rollback immediately:
class RollbackManager:
"""Manage rollback of failed deployments."""
def __init__(self, git_repo: str, audit_log: AuditLog):
self.repo = git_repo
self.audit_log = audit_log
def rollback_to_previous(self, change_id: str, reason: str = ""):
"""Rollback a failed deployment to previous version."""
# Step 1: Find the commit before the failed change
failed_commit = self._find_commit_for_change(change_id)
previous_commit = self._get_previous_commit(failed_commit)
# Step 2: Reset to previous commit
result = subprocess.run(
f"git reset --hard {previous_commit}",
cwd=self.repo,
capture_output=True
)
if result.returncode != 0:
return {
"success": False,
"error": "Git reset failed",
"stderr": result.stderr.decode()
}
# Step 3: Re-deploy previous version
result = subprocess.run(
"./deploy.sh",
cwd=self.repo,
capture_output=True,
timeout=300
)
if result.returncode != 0:
return {
"success": False,
"error": "Deployment failed after rollback"
}
# Step 4: Log the rollback
self.audit_log.log_rollback(change_id, reason)
# Step 5: Alert team
self._alert_team(f"Rolled back change {change_id}: {reason}")
return {
"success": True,
"rolled_back_to": previous_commit,
"reason": reason
}
def _find_commit_for_change(self, change_id: str) -> str:
"""Find the commit hash for a change ID."""
result = subprocess.run(
f"git log --grep='{change_id}' --oneline",
shell=True,
cwd=self.repo,
capture_output=True,
text=True
)
if result.stdout:
return result.stdout.split()[0]
return None
Strategy 5: Gradual Rollout with Canary
Deploy to a small percentage of users first, monitor, then expand:
class CanaryDeployment:
"""Gradually roll out changes to detect regressions."""
def deploy_canary(self, change_id: str, percentage: int = 1):
"""Deploy to a small percentage of traffic."""
# Step 1: Deploy new version
deployment_id = self._deploy_version(change_id)
# Step 2: Configure load balancer to send X% of traffic
self._configure_traffic_split(deployment_id, percentage)
# Step 3: Monitor for issues
print(f"Deployed to {percentage}% of users. Monitoring...")
issues = []
for _ in range(10): # Monitor for ~5 minutes (10 * 30s checks)
health = self._check_canary_health(deployment_id)
if not health["healthy"]:
issues.append(health)
time.sleep(30)
if issues:
# Rollback
self._configure_traffic_split(deployment_id, 0) # Stop traffic to new version
return {
"success": False,
"issues": issues,
"rolled_back": True
}
# Step 4: Gradually increase traffic
for percent in [5, 25, 50, 100]:
self._configure_traffic_split(deployment_id, percent)
print(f"Increased to {percent}%")
time.sleep(120) # Soak at each level for 2 minutes
return {
"success": True,
"message": "Fully deployed after canary validation"
}
def _check_canary_health(self, deployment_id: str) -> dict:
"""Check if canary deployment is healthy."""
# Query metrics for this specific deployment
error_rate = self._get_error_rate(deployment_id)
latency_p99 = self._get_latency(deployment_id)
# Thresholds
if error_rate > 0.02: # > 2% error rate
return {"healthy": False, "issue": "Error rate too high"}
if latency_p99 > 500: # > 500ms p99
return {"healthy": False, "issue": "Latency too high"}
return {"healthy": True}
Key Takeaways
- Never deploy agent changes directly; use multi-level approval workflow.
- Log every agent action for audit and debugging.
- Monitor post-deployment metrics to catch regressions early.
- Deploy with canary (small percentage first), expand gradually.
- Rollback immediately if health checks fail.
- Alert the team on failures; track rollbacks for learning.
Frequently Asked Questions
How long should I monitor before full deployment?
For canary, monitor at least 5–10 minutes per traffic level. For full deployment, monitor 24 hours before considering stable. Fast rollbacks are better than slow monitoring; prioritize detecting issues quickly.
What if approval takes too long?
Automate approvals where possible (automated tests, integration tests). For code review, set SLAs (e.g., 4-hour review window). Urgent fixes bypass normal approval but require post-deployment review.
Can I deploy multiple agents in parallel?
Not recommended. Deploy serially. If two agents deploy changes to the same file simultaneously, one rollback will undo the other. Use mutual exclusion or queue-based deployment.
What metrics should I monitor?
Essential: request latency (p99), error rate, database query latency. Optional: resource usage, cache hit ratio, feature flags. Define alerts for each (e.g., error rate > 2% → rollback).