Branching and Conditional Logic in AI Workflows
Real workflows are not always linear; they branch based on conditions. A support ticket workflow might escalate high-priority tickets to a specialist but auto-reply to low-priority ones. A loan application workflow might route approval/denial based on a credit score. Branching nodes enable this kind of adaptive behavior by evaluating a condition and directing the workflow down one of several paths. The challenge is ensuring that all paths are well-defined, that they merge correctly at the end, and that the context is consistent across all branches.
In this article, you will learn to design conditional branches, implement decision nodes, handle multi-path execution, and debug workflows that split and rejoin.
Branch Nodes: Decision Points
A branch node is a non-executing step that evaluates a condition and routes the workflow. Unlike LLM or tool steps that transform context, branch nodes only read context and decide which path to follow.
from enum import Enum
from typing import Any, Callable, Dict, List
class BranchNode:
"""
A workflow node that evaluates a condition and routes to one of several paths.
"""
def __init__(
self,
name: str,
condition: Callable[[dict], str],
paths: Dict[str, str],
):
"""
Args:
name: Name of the branch node.
condition: Function that takes context and returns a path name.
paths: Dict mapping path names to next node names.
E.g., {"high": "escalate_node", "low": "auto_reply_node"}
"""
self.name = name
self.condition = condition
self.paths = paths
def execute(self, context: dict) -> tuple[dict, str]:
"""
Evaluate the condition and return the next node.
Args:
context: Workflow context.
Returns:
Tuple of (updated_context, next_node_name).
"""
try:
# Evaluate the condition.
path_name = self.condition(context)
# Validate the path.
if path_name not in self.paths:
return (
{
**context,
f"{self.name}_error": f"Condition returned unknown path: {path_name}",
},
None,
)
# Record which path was taken.
return (
{
**context,
f"{self.name}_path_taken": path_name,
f"{self.name}_success": True,
},
self.paths[path_name],
)
except Exception as e:
return (
{
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
},
None,
)
# Example: Route ticket based on priority
def ticket_priority_condition(context: dict) -> str:
"""
Determine routing based on ticket priority.
Returns: 'escalate', 'investigate', or 'auto_reply'.
"""
priority = context.get("priority", "unknown")
if priority in ["critical", "high"]:
return "escalate"
elif priority == "medium":
return "investigate"
else:
return "auto_reply"
ticket_router = BranchNode(
name="route_ticket",
condition=ticket_priority_condition,
paths={
"escalate": "page_oncall",
"investigate": "create_investigation",
"auto_reply": "send_auto_reply",
},
)
# Test the branch.
context = {"priority": "high", "ticket_id": "TKT-12345"}
updated_context, next_node = ticket_router.execute(context)
print(f"Path taken: {updated_context['route_ticket_path_taken']}")
print(f"Next node: {next_node}")
Multi-Condition Branches
For complex routing, use nested or multi-condition branches:
class MultiConditionBranch:
"""
A branch that evaluates multiple conditions in sequence.
Each condition can have its own nested branches.
"""
def __init__(
self,
name: str,
conditions: List[tuple[Callable[[dict], bool], str]],
default_path: str,
):
"""
Args:
name: Name of the branch.
conditions: List of (predicate, path_name) tuples.
Evaluated in order; first True predicate wins.
default_path: Path if no condition matches.
"""
self.name = name
self.conditions = conditions
self.default_path = default_path
def execute(self, context: dict) -> tuple[dict, str]:
"""Evaluate conditions and route."""
try:
for predicate, path_name in self.conditions:
if predicate(context):
return (
{
**context,
f"{self.name}_path_taken": path_name,
f"{self.name}_success": True,
},
path_name,
)
# No condition matched; use default.
return (
{
**context,
f"{self.name}_path_taken": self.default_path,
f"{self.name}_success": True,
},
self.default_path,
)
except Exception as e:
return (
{
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
},
None,
)
# Example: Complex loan approval logic
def is_high_income(context: dict) -> bool:
return context.get("annual_income", 0) > 100000
def is_good_credit(context: dict) -> bool:
return context.get("credit_score", 0) > 700
def is_low_debt(context: dict) -> bool:
return context.get("debt_to_income_ratio", 1.0) < 0.5
loan_approval_router = MultiConditionBranch(
name="assess_loan",
conditions=[
(lambda ctx: (
is_high_income(ctx) and is_good_credit(ctx) and is_low_debt(ctx)
), "auto_approve"),
(lambda ctx: is_good_credit(ctx) and is_low_debt(ctx), "manual_review"),
(lambda ctx: is_high_income(ctx), "escalate_to_specialist"),
],
default_path="decline",
)
# Test with different applicants.
applicant_a = {
"annual_income": 150000,
"credit_score": 750,
"debt_to_income_ratio": 0.3,
}
context_a, next_node_a = loan_approval_router.execute(applicant_a)
print(f"Applicant A routed to: {next_node_a}") # auto_approve
applicant_b = {
"annual_income": 50000,
"credit_score": 680,
"debt_to_income_ratio": 0.6,
}
context_b, next_node_b = loan_approval_router.execute(applicant_b)
print(f"Applicant B routed to: {next_node_b}") # decline
Merging Parallel Branches
When multiple paths rejoin, you need a merge node that waits for all branches to complete and combines their results:
class MergeNode:
"""
Waits for multiple parallel branches to complete and merges context.
"""
def __init__(self, name: str, branch_names: List[str]):
"""
Args:
name: Name of the merge node.
branch_names: List of branch node names to wait for.
"""
self.name = name
self.branch_names = branch_names
def execute(self, contexts: Dict[str, dict]) -> dict:
"""
Merge multiple branch contexts into a single context.
Args:
contexts: Dict mapping branch name to its final context.
Returns:
Merged context (all branch results included).
"""
try:
merged = {}
for branch_name in self.branch_names:
if branch_name not in contexts:
return {
**merged,
f"{self.name}_error": f"Missing branch result: {branch_name}",
f"{self.name}_success": False,
}
branch_context = contexts[branch_name]
# Merge branch results into the main context.
# Use a prefix to avoid key conflicts.
for key, value in branch_context.items():
merged[f"{branch_name}_{key}"] = value
# Preserve common context.
merged[f"{self.name}_success"] = True
merged[f"{self.name}_merged_branches"] = self.branch_names
return merged
except Exception as e:
return {
f"{self.name}_error": str(e),
f"{self.name}_success": False,
}
# Example: In a workflow graph, this is represented as
# multiple paths that converge at a merge node.
merge_node = MergeNode(
name="merge_results",
branch_names=["escalate_path", "auto_reply_path"],
)
# Simulate two parallel branch completions.
escalate_context = {
"escalated": True,
"oncall_id": "ENG-123",
}
auto_reply_context = {
"auto_replied": True,
"reply_sent_at": "2026-06-02T15:00:00Z",
}
merged = merge_node.execute({
"escalate_path": escalate_context,
"auto_reply_path": auto_reply_context,
})
print(f"Merged context keys: {list(merged.keys())}")
# Output: escalate_path_escalated, escalate_path_oncall_id, auto_reply_path_auto_replied, ...
Workflow DAG with Branches
A complete workflow graph with branching looks like:
class WorkflowGraph:
"""
A directed acyclic graph (DAG) of workflow nodes, supporting branches.
"""
def __init__(self):
self.nodes = {} # name -> node object
self.edges = {} # name -> list of next node names (or callable for branches)
def add_node(self, name: str, node: Any):
"""Register a node in the graph."""
self.nodes[name] = node
self.edges[name] = []
def add_edge(self, from_node: str, to_node: str):
"""Add an edge from one node to another."""
if from_node not in self.edges:
self.edges[from_node] = []
self.edges[from_node].append(to_node)
async def execute(self, initial_context: dict, start_node: str) -> dict:
"""
Execute the workflow starting from start_node.
Handles both linear sequences and branches.
"""
context = initial_context
current_node = start_node
visited = set()
while current_node:
# Prevent infinite loops.
if current_node in visited:
raise RuntimeError(f"Cycle detected at node: {current_node}")
visited.add(current_node)
node = self.nodes.get(current_node)
if not node:
raise RuntimeError(f"Unknown node: {current_node}")
# Execute the node.
if isinstance(node, BranchNode):
context, current_node = node.execute(context)
else:
# Assume it's a tool or LLM step.
context = await node.execute(context)
# Move to the next node in the sequence.
next_nodes = self.edges.get(current_node, [])
current_node = next_nodes[0] if next_nodes else None
return context
# Build the workflow graph.
graph = WorkflowGraph()
graph.add_node("fetch_ticket", ToolStep("fetch_ticket", fetch_ticket_tool))
graph.add_node("analyze", LLMStep("analyze", "claude-3-5-sonnet-20241022", analyze_template))
graph.add_node("route", ticket_router)
graph.add_node("escalate", ToolStep("escalate", escalate_tool))
graph.add_node("auto_reply", ToolStep("auto_reply", auto_reply_tool))
graph.add_node("log", ToolStep("log", log_event_tool))
graph.add_edge("fetch_ticket", "analyze")
graph.add_edge("analyze", "route")
# Branch routing is handled by the BranchNode itself.
graph.add_edge("escalate", "log")
graph.add_edge("auto_reply", "log")
# Execute.
context = {"ticket_id": "TKT-12345"}
result = await graph.execute(context, "fetch_ticket")
Key Takeaways
- Branch nodes evaluate conditions and route the workflow without modifying context.
- Use multi-condition branches for complex routing logic; evaluate conditions in order.
- Merge nodes combine results from parallel branches back into a single context.
- Use prefixes (e.g.,
branch_name_key) to avoid key conflicts when merging. - Workflow DAGs support both linear sequences and branching with proper cycle detection.
- Always validate that all paths are defined and that branches merge correctly.
Frequently Asked Questions
What if a condition throws an exception?
Catch the exception in the branch node, write the error to context, and set a success flag to False. The workflow engine should halt on branch failure; do not silently default to a path.
Can I have nested branches (a branch within a branch)?
Yes. A branch can route to another branch node instead of a tool or LLM step. However, deeply nested branches (more than 3 levels) become hard to debug. Consider refactoring into separate workflows.
What happens if multiple paths write to the same context key?
This is undefined behavior and should be avoided. Use branch-specific prefixes or ensure that each branch writes to disjoint keys. At merge time, you can consolidate results into a canonical key if needed.
How do I test a workflow with branches?
Test each branch independently with different context inputs that trigger each condition. Use mocked tool steps and LLM steps. Verify that the final merged context contains all expected keys and values.
Can a branch dynamically define its paths at runtime?
Yes, but with caution. If the number of paths is unknown, you might want to use a loop node (next article) instead of a fixed branch. Fixed branches are more testable and easier to observe.