Building Event-Driven Triggers in AI Workflows
A trigger is the entry point that detects an external event and spawns a workflow execution. Building reliable triggers requires handling three challenges: (1) ensuring that each event spawns exactly one execution (deduplication), (2) making triggers idempotent so that a replay does not corrupt state, and (3) validating and normalizing incoming payloads before passing them to the workflow engine.
In this article, you will implement three trigger types—webhook, scheduled, and manual—and learn the patterns that large-scale automation platforms use to prevent duplicate executions and handle failure cases gracefully.
What Makes a Good Trigger?
A trigger must be:
- Reliable: It detects every event, even if the network is flaky.
- Deduplicating: Duplicate events (e.g., a webhook retried by the source system) do not spawn duplicate executions.
- Atomic: The trigger either fully initializes a workflow run or fails without partial state.
- Fast: The source system expects a quick acknowledgment, not a 5-second wait.
The key insight is that the trigger handler should immediately return a success response to the source, then queue the actual workflow execution asynchronously. This decoupling ensures the source system is not blocked by slow workflow processing.
Webhook Triggers: HTTP Entry Points
A webhook trigger is an HTTP endpoint that receives a POST request from an external system. The endpoint payload becomes the initial workflow context.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import hashlib
import json
import uuid
from datetime import datetime, timedelta
import asyncio
app = FastAPI()
class TicketWebhookPayload(BaseModel):
ticket_id: str
customer_email: str
subject: str
body: str
@validator('ticket_id')
def ticket_id_must_be_valid(cls, v):
if not v.startswith('TKT-'):
raise ValueError('ticket_id must start with TKT-')
return v
# Shared state: in production, use Redis
recent_triggers = {}
async def store_trigger_id(trigger_id: str, ttl_seconds: int = 30):
"""Mark a trigger as processed for the deduplication window."""
recent_triggers[trigger_id] = datetime.utcnow() + timedelta(seconds=ttl_seconds)
# In production: await redis_client.setex(f"trigger:{trigger_id}", ttl_seconds, "1")
async def is_duplicate_trigger(trigger_id: str) -> bool:
"""Check if we have seen this trigger recently."""
return trigger_id in recent_triggers
def compute_trigger_id(payload: dict) -> str:
"""
Generate a deterministic trigger ID from the payload.
This ensures that identical payloads (replayed) have the same ID.
"""
# Sort keys for consistency; include a timestamp if available.
payload_str = json.dumps(payload, sort_keys=True)
return hashlib.sha256(payload_str.encode()).hexdigest()
@app.post("/workflows/ticket-handler")
async def handle_ticket_webhook(payload: TicketWebhookPayload):
"""
Entry point for ticket creation events.
This handler:
1. Validates the payload.
2. Computes a deterministic trigger ID.
3. Checks for duplicates.
4. Queues the workflow (does not wait).
5. Returns immediately to the source.
"""
# 1. Validate payload (Pydantic does this automatically).
payload_dict = payload.dict()
# 2. Compute trigger ID from normalized payload.
trigger_id = compute_trigger_id(payload_dict)
# 3. Check for duplicates in the recent window.
if await is_duplicate_trigger(trigger_id):
return {
"status": "duplicate",
"message": "Workflow for this event is already queued or executing.",
"trigger_id": trigger_id,
}
# 4. Mark this trigger as processed.
await store_trigger_id(trigger_id)
# 5. Create the initial execution context.
execution_id = str(uuid.uuid4())
context = {
"execution_id": execution_id,
"trigger_id": trigger_id,
"triggered_at": datetime.utcnow().isoformat(),
"payload": payload_dict,
}
# 6. Queue the workflow asynchronously (pseudocode).
# In production: await task_queue.enqueue(execute_workflow, "ticket_handler", context)
asyncio.create_task(simulate_queue_workflow(context))
# 7. Return immediately.
return {
"status": "accepted",
"execution_id": execution_id,
"trigger_id": trigger_id,
}
async def simulate_queue_workflow(context: dict):
"""Simulate queueing the workflow (in production, this goes to a task queue)."""
# Pretend we are sending this to a task queue.
await asyncio.sleep(0.1) # Simulate queue latency
print(f"Workflow queued: execution_id={context['execution_id']}")
Key patterns:
- Payload validation via Pydantic: reject malformed requests before creating an execution.
- Deterministic trigger ID: hash the normalized payload to detect retries.
- Deduplication window: store trigger IDs in Redis with a TTL (e.g., 30 seconds). Any duplicate in that window is rejected.
- Async queueing: return success to the source immediately; queue the workflow asynchronously.
Scheduled Triggers: Cron-Based Execution
A scheduled trigger fires on a timer, typically using a cron expression. Unlike webhooks, scheduled triggers have no external payload—they are initialized with fixed parameters or by reading state from a database.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import asyncio
scheduler = AsyncIOScheduler()
async def trigger_daily_report_workflow():
"""
Trigger a daily report workflow at 9 AM UTC.
No external payload; context is built from workflow parameters.
"""
execution_id = str(uuid.uuid4())
context = {
"execution_id": execution_id,
"trigger_id": f"schedule-daily-report-{datetime.utcnow().date()}",
"triggered_at": datetime.utcnow().isoformat(),
"trigger_type": "schedule",
"report_date": datetime.utcnow().date().isoformat(),
}
# Queue the workflow.
# await task_queue.enqueue(execute_workflow, "daily_report", context)
print(f"Scheduled workflow queued: {context['execution_id']}")
# Register the scheduled trigger.
scheduler.add_job(
trigger_daily_report_workflow,
CronTrigger(hour=9, minute=0, timezone='UTC'),
id='daily-report-9am',
)
async def trigger_every_5_minutes_cleanup():
"""Trigger a cleanup workflow every 5 minutes."""
execution_id = str(uuid.uuid4())
trigger_id = f"schedule-cleanup-{int(datetime.utcnow().timestamp() / 300)}"
context = {
"execution_id": execution_id,
"trigger_id": trigger_id,
"triggered_at": datetime.utcnow().isoformat(),
"trigger_type": "schedule",
}
# Queue the workflow.
# await task_queue.enqueue(execute_workflow, "cleanup", context)
print(f"Cleanup workflow queued: {context['execution_id']}")
scheduler.add_job(
trigger_every_5_minutes_cleanup,
CronTrigger(minute='*/5'),
id='cleanup-every-5min',
)
# Start the scheduler.
scheduler.start()
Key patterns:
- Cron expressions: use a library like APScheduler or native cloud scheduler (Cloud Scheduler, EventBridge).
- Deduplication via trigger ID: for recurring schedules, include the time bucket in the trigger ID (e.g.,
schedule-cleanup-{minute_bucket}) so a re-run in the same minute is treated as a duplicate. - No external payload: context is built from fixed parameters or fetched from a database at trigger time.
Manual Triggers: User-Initiated Workflows
A manual trigger is initiated by a user clicking a "Run Workflow" button in a UI. The user may provide input parameters (e.g., "Run the export workflow for April 2026").
from typing import Optional
@app.post("/workflows/{workflow_name}/run")
async def run_workflow_manually(
workflow_name: str,
user_id: str,
parameters: Optional[dict] = None,
):
"""
Manual trigger endpoint.
Any user can trigger a workflow if they have permission.
The `parameters` dict is passed as initial context.
"""
# 1. Validate permissions (pseudocode).
# user_perms = await fetch_user_permissions(user_id)
# if workflow_name not in user_perms.allowed_workflows:
# raise HTTPException(status_code=403, detail="Access denied")
# 2. Create execution context.
execution_id = str(uuid.uuid4())
trigger_id = f"manual-{user_id}-{execution_id}" # Unique per user/click
context = {
"execution_id": execution_id,
"trigger_id": trigger_id,
"triggered_at": datetime.utcnow().isoformat(),
"trigger_type": "manual",
"initiated_by": user_id,
"parameters": parameters or {},
}
# 3. Queue the workflow.
# await task_queue.enqueue(execute_workflow, workflow_name, context)
print(f"Manual workflow queued: {context['execution_id']}")
return {
"status": "accepted",
"execution_id": execution_id,
"workflow_name": workflow_name,
}
Key patterns:
- Permission checks: verify that the user can trigger this workflow before queueing.
- User audit trail: include
initiated_byin context so you can later see who started the workflow. - Unique trigger ID per click: use the user ID and execution ID to ensure each manual trigger is unique.
Handling Trigger Failures and Retries
Triggers can fail for transient reasons (network timeouts, database unavailable). Best practice is to implement a retry strategy:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
)
async def queue_workflow_with_retry(workflow_name: str, context: dict):
"""
Attempt to queue a workflow 3 times with exponential backoff.
If all attempts fail, log an alert.
"""
try:
# await task_queue.enqueue(execute_workflow, workflow_name, context)
print(f"Queued {workflow_name} with context {context['execution_id']}")
except Exception as e:
print(f"Failed to queue workflow: {e}")
raise
@app.post("/workflows/ticket-handler-with-retry")
async def handle_ticket_webhook_with_retry(payload: TicketWebhookPayload):
"""Enhanced webhook handler with retry logic."""
payload_dict = payload.dict()
trigger_id = compute_trigger_id(payload_dict)
if await is_duplicate_trigger(trigger_id):
return {"status": "duplicate"}
execution_id = str(uuid.uuid4())
context = {
"execution_id": execution_id,
"trigger_id": trigger_id,
"triggered_at": datetime.utcnow().isoformat(),
"payload": payload_dict,
}
try:
await queue_workflow_with_retry("ticket_handler", context)
await store_trigger_id(trigger_id)
return {"status": "accepted", "execution_id": execution_id}
except Exception as e:
# Log and alert; do NOT return success.
print(f"Fatal trigger failure for {trigger_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to queue workflow")
Key Takeaways
- Triggers are entry points that detect external events and spawn workflow executions; they must be reliable, deduplicating, and fast.
- Webhook triggers validate payloads, compute deterministic trigger IDs, and check for duplicates within a time window (e.g., 30 seconds).
- Scheduled triggers use cron expressions and are deduplicated by time bucket (e.g., per-minute).
- Manual triggers are initiated by users, require permission checks, and include audit information.
- All triggers should queue the workflow asynchronously and return immediately to avoid blocking the source system.
- Implement retry logic with exponential backoff for transient queue failures.
Frequently Asked Questions
What happens if a webhook source retries the same payload multiple times?
The platform computes the same trigger ID for identical payloads and rejects duplicates within the deduplication window (e.g., 30 seconds). Only the first attempt spawns an execution; subsequent retries are silently ignored. The source receives an HTTP 200 OK response in all cases (not 409 Conflict) to signal success and stop retrying.
Can I trigger a workflow without a webhook?
Yes. Scheduled triggers use cron; manual triggers use a UI button; event-driven triggers use Pub/Sub (Kafka, Cloud Pub/Sub). Webhooks are just one option. Many platforms support multiple trigger types for a single workflow.
How do I prevent a user from triggering the same workflow twice in a row?
Add a debounce to the manual trigger endpoint: before accepting a manual trigger, check if an execution of that workflow is currently running and block if so. Alternatively, include a per-user rate limit (e.g., max 1 trigger per 10 seconds).
What if the task queue is down when a trigger fires?
If queuing fails after retries, return a 500 error to the source, which should retry. Store the failed trigger event in a dead-letter queue for later replay. This prevents silent loss of workflows.
How should I handle webhook payloads from untrusted sources?
Always validate the payload schema (Pydantic BaseModel), check the Content-Type header (must be application/json), and optionally verify a cryptographic signature (HMAC-SHA256 using a shared secret). Reject payloads that fail signature verification.