Skip to main content

Human-in-the-Loop Approval Gates for AI Workflows

Not all workflow decisions should be automated. Financial transactions, policy changes, and sensitive customer actions often require human review. An approval gate is a workflow node that pauses execution, notifies a human, waits for a decision (approve/reject), and resumes or halts based on that decision. Building approval gates correctly requires careful handling of timeouts, escalations, audit trails, and re-entrancy (resuming a workflow after a human decides).

In this article, you will learn to design approval gates, implement decision capture, handle timeouts and escalations, and maintain audit logs for compliance.

What Is an Approval Gate?

An approval gate is a non-executing node that:

  1. Pauses the workflow.
  2. Notifies one or more humans (email, Slack, dashboard).
  3. Waits for a decision (approve, reject, request more info).
  4. Resumes the workflow based on the decision.
  5. Logs who decided what and when (audit trail).

Unlike LLM or tool steps, approval gates introduce external latency (seconds to days) and require a mechanism to resume a paused workflow when the decision arrives.

Designing Approval Gate Steps

from enum import Enum
from datetime import datetime, timedelta
from typing import Optional
import uuid

class ApprovalDecision(str, Enum):
APPROVED = "approved"
REJECTED = "rejected"
PENDING = "pending"

class ApprovalGate:
"""
A workflow node that pauses and waits for human approval.
"""

def __init__(
self,
name: str,
approvers: list[str],
title: str,
description: str,
timeout_hours: int = 24,
):
"""
Args:
name: Name of the approval gate.
approvers: List of user IDs or email addresses who can approve.
title: Short title for the approval request.
description: Detailed description shown to approvers.
timeout_hours: How long to wait before escalating.
"""
self.name = name
self.approvers = approvers
self.title = title
self.description = description
self.timeout_hours = timeout_hours

def execute(self, context: dict) -> dict:
"""
Pause the workflow and create an approval request.

This returns immediately; the actual resumption happens when
a human submits a decision via the approval API.
"""
approval_id = str(uuid.uuid4())
due_at = datetime.utcnow() + timedelta(hours=self.timeout_hours)

# Create the approval request (store in database).
approval_record = {
"approval_id": approval_id,
"workflow_execution_id": context.get("execution_id"),
"gate_name": self.name,
"title": self.title,
"description": self.description,
"approvers": self.approvers,
"created_at": datetime.utcnow().isoformat(),
"due_at": due_at.isoformat(),
"context_snapshot": context, # Save context for later resumption
"decision": ApprovalDecision.PENDING.value,
"decided_at": None,
"decided_by": None,
}

# In production: await db.save_approval(approval_record)
print(f"Approval request created: {approval_id}")

# Notify approvers (email, Slack, etc.).
# In production: await notify_approvers(approval_record)

# Return "paused" state; the workflow stops here.
return {
**context,
f"{self.name}_approval_id": approval_id,
f"{self.name}_status": "paused",
f"{self.name}_awaiting_approval": True,
f"{self.name}_due_at": due_at.isoformat(),
}

# Example: Approval gate for large transactions
transaction_approval = ApprovalGate(
name="approve_transaction",
approvers=["[email protected]", "[email protected]"],
title="Approve Large Transaction",
description="A transaction of $50,000 requires approval.",
timeout_hours=4,
)

# In workflow:
context = {
"execution_id": "EXEC-123",
"amount": 50000,
"recipient": "[email protected]",
}

result = transaction_approval.execute(context)
print(f"Workflow paused, approval ID: {result['approve_transaction_approval_id']}")

Capturing Approval Decisions

When a human approves or rejects, the decision is captured and the workflow resumes. Here is the API handler:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ApprovalDecisionRequest(BaseModel):
approval_id: str
decision: ApprovalDecision
decided_by: str
comment: Optional[str] = None

async def resume_workflow(approval_record: dict):
"""
Resume a paused workflow after approval.
This is called after a human decision is recorded.
"""
execution_id = approval_record["workflow_execution_id"]
context = approval_record["context_snapshot"]
gate_name = approval_record["gate_name"]

# Add decision information to context.
context[f"{gate_name}_decision"] = approval_record["decision"]
context[f"{gate_name}_decided_by"] = approval_record["decided_by"]
context[f"{gate_name}_decided_at"] = approval_record["decided_at"]

if approval_record["decision"] == ApprovalDecision.APPROVED.value:
# Continue workflow to the next node.
# In production: await workflow_engine.resume(execution_id, context)
print(f"Workflow {execution_id} resumed after approval")
return True
else:
# Reject the workflow; no further steps execute.
# In production: await workflow_engine.halt(execution_id, context)
print(f"Workflow {execution_id} rejected")
return False

@app.post("/approvals/{approval_id}/decide")
async def submit_approval_decision(
approval_id: str,
decision_req: ApprovalDecisionRequest,
):
"""
Submit a decision for an approval gate.
"""
# 1. Validate the approval exists.
# approval_record = await db.get_approval(approval_id)
# if not approval_record:
# raise HTTPException(status_code=404, detail="Approval not found")

# 2. Validate the submitter is an approver.
# if decision_req.decided_by not in approval_record["approvers"]:
# raise HTTPException(status_code=403, detail="Not an approver for this gate")

# 3. Validate the approval is still pending.
# if approval_record["decision"] != ApprovalDecision.PENDING.value:
# raise HTTPException(status_code=400, detail="Already decided")

# 4. Check if approval is past due.
# if datetime.fromisoformat(approval_record["due_at"]) < datetime.utcnow():
# raise HTTPException(status_code=400, detail="Approval past due; escalating")

# 5. Record the decision.
# approval_record["decision"] = decision_req.decision.value
# approval_record["decided_at"] = datetime.utcnow().isoformat()
# approval_record["decided_by"] = decision_req.decided_by
# approval_record["comment"] = decision_req.comment
# await db.update_approval(approval_record)

# 6. Resume or halt the workflow.
# success = await resume_workflow(approval_record)

return {
"approval_id": approval_id,
"status": "recorded",
"decision": decision_req.decision.value,
}

Handling Timeouts and Escalations

If an approval is not decided within the timeout, escalate to a manager or auto-reject:

from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

async def check_overdue_approvals():
"""
Periodically check for overdue approvals and escalate them.
Run every 5 minutes.
"""
now = datetime.utcnow()

# Query approvals that are pending and past due.
# overdue_approvals = await db.query_approvals(
# decision=ApprovalDecision.PENDING.value,
# due_at_before=now,
# )

# for approval in overdue_approvals:
# await escalate_approval(approval)

async def escalate_approval(approval_record: dict):
"""
Escalate an overdue approval to a manager or executive.
"""
execution_id = approval_record["workflow_execution_id"]

# Option 1: Auto-reject after timeout.
# approval_record["decision"] = ApprovalDecision.REJECTED.value
# approval_record["decided_at"] = datetime.utcnow().isoformat()
# approval_record["decided_by"] = "system-timeout"
# await db.update_approval(approval_record)
# await resume_workflow(approval_record)

# Option 2: Escalate to a manager.
escalation_approvers = ["[email protected]", "[email protected]"]
approval_record["approvers"] = escalation_approvers
approval_record["due_at"] = (
datetime.utcnow() + timedelta(hours=2)
).isoformat()

# await db.update_approval(approval_record)
# await notify_approvers(approval_record, subject="ESCALATED: Approval needed")

print(f"Escalated approval {approval_record['approval_id']}")

# Register the scheduler job.
scheduler.add_job(
check_overdue_approvals,
CronTrigger(minute='*/5'),
id='check-overdue-approvals',
)

scheduler.start()

Audit Logging for Compliance

Every approval decision must be logged for compliance (financial, healthcare, etc.):

class AuditLog:
"""Simple audit logger for approval decisions."""

@staticmethod
async def log_approval_decision(
approval_record: dict,
decision: str,
decided_by: str,
comment: str = None,
):
"""
Log an approval decision to an audit trail.
"""
log_entry = {
"event_type": "approval_decision",
"approval_id": approval_record["approval_id"],
"workflow_execution_id": approval_record["workflow_execution_id"],
"gate_name": approval_record["gate_name"],
"decision": decision,
"decided_by": decided_by,
"decided_at": datetime.utcnow().isoformat(),
"comment": comment,
"approvers": approval_record["approvers"],
"context_snapshot": approval_record["context_snapshot"],
}

# In production: write to immutable audit log (database, cloud logging service)
# await audit_db.insert(log_entry)

print(f"Audit log: {log_entry}")

return log_entry

# Usage:
async def submit_approval_decision_with_audit(approval_id: str, decision_req):
approval_record = {} # fetch from DB

# Record decision.
approval_record["decision"] = decision_req.decision.value
approval_record["decided_by"] = decision_req.decided_by

# Log for compliance.
await AuditLog.log_approval_decision(
approval_record,
decision=decision_req.decision.value,
decided_by=decision_req.decided_by,
comment=decision_req.comment,
)

# Resume workflow.
await resume_workflow(approval_record)

Multi-Approver Gates with Consensus

For sensitive decisions, require multiple approvers to agree:

class MultiApproverGate:
"""
An approval gate that requires N out of M approvers to agree.
"""

def __init__(
self,
name: str,
approvers: list[str],
required_approvals: int,
title: str,
description: str,
):
self.name = name
self.approvers = approvers
self.required_approvals = required_approvals
self.title = title
self.description = description

def execute(self, context: dict) -> dict:
"""Create a multi-approver gate."""
if self.required_approvals > len(self.approvers):
return {
**context,
f"{self.name}_error": "required_approvals exceeds number of approvers",
f"{self.name}_success": False,
}

approval_id = str(uuid.uuid4())

approval_record = {
"approval_id": approval_id,
"workflow_execution_id": context.get("execution_id"),
"gate_name": self.name,
"title": self.title,
"description": self.description,
"approvers": self.approvers,
"required_approvals": self.required_approvals,
"decisions": {}, # Tracks each approver's decision
"created_at": datetime.utcnow().isoformat(),
"context_snapshot": context,
}

# In production: await db.save_approval(approval_record)

return {
**context,
f"{self.name}_approval_id": approval_id,
f"{self.name}_status": "awaiting_decisions",
}

async def record_multi_approval_decision(approval_id: str, approver: str, decision: str):
"""Record one approver's decision in a multi-approver gate."""
# approval_record = await db.get_approval(approval_id)

# approval_record["decisions"][approver] = {
# "decision": decision,
# "decided_at": datetime.utcnow().isoformat(),
# }

# approvals_count = sum(
# 1 for d in approval_record["decisions"].values()
# if d["decision"] == ApprovalDecision.APPROVED.value
# )

# if approvals_count >= approval_record["required_approvals"]:
# # Enough approvals received; proceed.
# approval_record["final_decision"] = ApprovalDecision.APPROVED.value
# await resume_workflow(approval_record)
# elif any(
# d["decision"] == ApprovalDecision.REJECTED.value
# for d in approval_record["decisions"].values()
# ):
# # Any rejection vetos the entire gate.
# approval_record["final_decision"] = ApprovalDecision.REJECTED.value
# await resume_workflow(approval_record)

pass

Key Takeaways

  • Approval gates pause workflow execution and notify humans; they resume only after a decision is recorded.
  • Store the full context snapshot with each approval for safe resumption.
  • Always enforce approval timeouts and escalate overdue decisions to managers.
  • Log every approval decision to an immutable audit trail for compliance.
  • Support multi-approver gates (require N out of M to agree) for sensitive decisions.
  • Implement proper access control: only designated approvers can decide on a gate.

Frequently Asked Questions

What if an approver disappears or is unavailable?

Use timeouts and escalation. If an approval is pending for X hours, escalate to a manager or auto-reject. Escalation rules should be configurable (business logic, not hardcoded).

Can I request more information from a human and continue the same approval?

Yes, add a "request_more_info" decision type. When selected, send a question back to the workflow and wait for a response. This is more complex than approve/reject but is valuable for complex decisions.

How do I resume a workflow after a long pause?

Store the execution context in the approval record. When the decision arrives, resume by restoring the context and executing the next node. Use an execution ID to identify which workflow to resume.

Should the context snapshot be encrypted?

If the context contains sensitive data (passwords, API keys), encrypt it before storing. Always scrub secrets before saving context snapshots. In production, use encryption at rest (database, cloud storage).

What if the same approval gate fires multiple times in a workflow?

Generate a unique approval_id for each gate invocation. Use the workflow execution_id + gate name + invocation index to ensure uniqueness. This allows a workflow to have multiple approval gates without conflicts.

Further Reading