Loop and Retry Strategies in Workflow Automation
Real workflows must handle two challenges: processing multiple items in a loop (e.g., analyze all comments in a ticket thread) and recovering from transient failures (network timeouts, rate limits). This article covers loop nodes (for iteration) and retry strategies (for resilience), with practical examples of how to combine them without creating infinite loops or runaway costs.
Loop Nodes: Iterating Over Collections
A loop node executes a body step or sub-workflow multiple times, once per item in a collection. The loop maintains a loop counter and a result accumulator, updating context after each iteration.
from typing import Any, List, Callable
class LoopNode:
"""
A workflow node that iterates over a list and executes a body step repeatedly.
"""
def __init__(
self,
name: str,
items_key: str,
body_step: Any,
max_iterations: int = 1000,
):
"""
Args:
name: Name of the loop node.
items_key: Context key containing the list to iterate over.
body_step: The step/node to execute for each item.
max_iterations: Safety limit to prevent infinite loops.
"""
self.name = name
self.items_key = items_key
self.body_step = body_step
self.max_iterations = max_iterations
async def execute(self, context: dict) -> dict:
"""
Execute the loop.
Args:
context: Workflow context.
Returns:
Updated context with loop results.
"""
items = context.get(self.items_key, [])
# Validate.
if not isinstance(items, list):
return {
**context,
f"{self.name}_error": f"{self.items_key} is not a list",
f"{self.name}_success": False,
}
if len(items) > self.max_iterations:
return {
**context,
f"{self.name}_error": f"Too many items ({len(items)}); max is {self.max_iterations}",
f"{self.name}_success": False,
}
# Initialize loop state.
results = []
errors = []
loop_context = context.copy()
try:
for i, item in enumerate(items):
# Add current item and iteration number to context.
loop_context[f"{self.name}_item"] = item
loop_context[f"{self.name}_index"] = i
# Execute the body step.
loop_context = await self.body_step.execute(loop_context)
# Check for errors in the body step.
body_success_key = f"{self.body_step.name}_success"
if not loop_context.get(body_success_key, True):
errors.append({
"index": i,
"item": item,
"error": loop_context.get(f"{self.body_step.name}_error"),
})
# Accumulate results.
results.append({
"index": i,
"item": item,
"success": loop_context.get(body_success_key, True),
})
return {
**loop_context,
f"{self.name}_results": results,
f"{self.name}_errors": errors,
f"{self.name}_iterations": len(items),
f"{self.name}_success": len(errors) == 0,
}
except Exception as e:
return {
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
}
# Example: Process all comments in a support ticket
class ProcessCommentStep:
"""A simple step that processes a single comment."""
def __init__(self, name: str):
self.name = name
async def execute(self, context: dict) -> dict:
"""Extract sentiment and entities from a comment."""
comment = context.get(f"{context.get('_loop_name', '')}_item")
if not comment:
return {
**context,
f"{self.name}_success": False,
f"{self.name}_error": "No item in loop context",
}
# Simulate processing.
sentiment = "positive" if len(comment) > 50 else "neutral"
return {
**context,
f"{self.name}_sentiment": sentiment,
f"{self.name}_processed_comment": comment[:100],
f"{self.name}_success": True,
}
process_comment = ProcessCommentStep(name="process_comment")
comment_loop = LoopNode(
name="process_comments",
items_key="comments",
body_step=process_comment,
max_iterations=500,
)
# Test.
context = {
"comments": [
"This is great!",
"The API documentation is very comprehensive and well-written with many examples.",
"Not bad.",
],
}
result = await comment_loop.execute(context)
print(f"Processed {result['process_comments_iterations']} comments")
print(f"Errors: {len(result['process_comments_errors'])}")
Conditional Loops: While and Until
Loop nodes can also run until a condition is met:
class ConditionalLoopNode:
"""
A loop that runs until a condition is satisfied or a max iteration count is hit.
"""
def __init__(
self,
name: str,
body_step: Any,
condition: Callable[[dict], bool],
max_iterations: int = 100,
):
"""
Args:
name: Name of the loop.
body_step: Step to execute each iteration.
condition: Function that takes context and returns True to continue looping.
max_iterations: Safety limit.
"""
self.name = name
self.body_step = body_step
self.condition = condition
self.max_iterations = max_iterations
async def execute(self, context: dict) -> dict:
"""Execute until condition is false or max iterations reached."""
iterations = 0
loop_context = context.copy()
try:
while self.condition(loop_context) and iterations < self.max_iterations:
loop_context[f"{self.name}_iteration"] = iterations
# Execute body.
loop_context = await self.body_step.execute(loop_context)
iterations += 1
return {
**loop_context,
f"{self.name}_iterations": iterations,
f"{self.name}_loop_exited_normally": iterations < self.max_iterations,
f"{self.name}_success": True,
}
except Exception as e:
return {
**context,
f"{self.name}_error": str(e),
f"{self.name}_success": False,
}
# Example: Keep retrying a query until data is available
class FetchStatusStep:
"""Fetch the status of an async job and check if it's done."""
def __init__(self, name: str):
self.name = name
async def execute(self, context: dict) -> dict:
"""Fetch job status; simulate polling."""
job_id = context.get("job_id")
# Simulate polling an external service.
import random
is_done = random.random() > 0.7 # 30% chance done each iteration
return {
**context,
f"{self.name}_is_done": is_done,
f"{self.name}_success": True,
}
fetch_status = FetchStatusStep(name="fetch_status")
poll_loop = ConditionalLoopNode(
name="wait_for_job",
body_step=fetch_status,
condition=lambda ctx: not ctx.get("fetch_status_is_done", False),
max_iterations=30,
)
# Test.
context = {"job_id": "JOB-999"}
result = await poll_loop.execute(context)
print(f"Polled {result['wait_for_job_iterations']} times")
Retry Logic with Exponential Backoff
Retries are essential for handling transient failures. Use exponential backoff (wait increasingly longer between retries) to avoid overwhelming a struggling service:
import asyncio
from datetime import datetime, timedelta
class RetryableStep:
"""
A workflow step that retries on failure with exponential backoff.
"""
def __init__(
self,
name: str,
step: Any,
max_retries: int = 3,
initial_backoff_secs: float = 1.0,
backoff_multiplier: float = 2.0,
jitter: bool = True,
):
"""
Args:
name: Name of the retryable wrapper.
step: The underlying step to retry.
max_retries: Maximum number of retries.
initial_backoff_secs: Initial backoff duration.
backoff_multiplier: Multiply backoff by this factor each retry.
jitter: Add random jitter to backoff to avoid thundering herd.
"""
self.name = name
self.step = step
self.max_retries = max_retries
self.initial_backoff_secs = initial_backoff_secs
self.backoff_multiplier = backoff_multiplier
self.jitter = jitter
async def execute(self, context: dict) -> dict:
"""Execute with retries."""
attempt = 0
last_error = None
backoff_secs = self.initial_backoff_secs
while attempt <= self.max_retries:
try:
# Attempt the step.
result = await self.step.execute(context)
# Check if the step succeeded.
step_success_key = f"{self.step.name}_success"
if result.get(step_success_key, True):
# Success; return.
return {
**result,
f"{self.name}_attempts": attempt + 1,
f"{self.name}_success": True,
}
else:
# Step failed; log error and retry.
last_error = result.get(f"{self.step.name}_error", "Unknown error")
if attempt < self.max_retries:
# Wait before retrying.
wait_time = backoff_secs
if self.jitter:
import random
wait_time *= (0.5 + random.random())
await asyncio.sleep(wait_time)
backoff_secs *= self.backoff_multiplier
attempt += 1
except Exception as e:
# Exception during step execution; retry.
last_error = str(e)
if attempt < self.max_retries:
wait_time = backoff_secs
if self.jitter:
import random
wait_time *= (0.5 + random.random())
await asyncio.sleep(wait_time)
backoff_secs *= self.backoff_multiplier
attempt += 1
# All retries exhausted.
return {
**context,
f"{self.name}_error": f"Failed after {self.max_retries} retries: {last_error}",
f"{self.name}_success": False,
f"{self.name}_attempts": attempt,
}
# Example: Retry a flaky API call
flaky_step = ToolStep("call_flaky_api", flaky_api_tool)
resilient_step = RetryableStep(
name="resilient_api_call",
step=flaky_step,
max_retries=3,
initial_backoff_secs=1.0,
backoff_multiplier=2.0,
jitter=True,
)
# Execute.
context = {"endpoint": "/api/data"}
result = await resilient_step.execute(context)
print(f"Succeeded after {result['resilient_api_call_attempts']} attempts")
Combining Loops and Retries
In real workflows, you often need both: iterate over items AND retry each item on failure.
# Wrap the body step in retry logic before passing to loop.
resilient_comment_processor = RetryableStep(
name="resilient_process_comment",
step=process_comment,
max_retries=2,
)
comment_loop_with_retries = LoopNode(
name="process_comments_with_retries",
items_key="comments",
body_step=resilient_comment_processor,
max_iterations=500,
)
# Test.
context = {
"comments": [
"Comment 1",
"Comment 2",
"Comment 3",
],
}
result = await comment_loop_with_retries.execute(context)
print(f"Processed {result['process_comments_with_retries_iterations']} comments")
print(f"Total errors: {len(result['process_comments_with_retries_errors'])}")
Key Takeaways
- Loop nodes iterate over collections, maintaining a counter and accumulator.
- Conditional loops run until a condition is met, with a max iteration safety limit.
- Exponential backoff (with jitter) retries transient failures without overwhelming the service.
- Wrap step nodes in RetryableStep to add retries without changing the step logic.
- Always enforce max iteration and max retry limits to prevent runaway execution.
- Log loop errors and include them in context for observability.
Frequently Asked Questions
How do I break out of a loop early?
Add a break condition to the loop node. For counted loops, you can exit if an error threshold is reached (e.g., if 50% of items fail, halt). For conditional loops, the condition itself controls exit.
What is the recommended exponential backoff formula?
A common formula is wait_time = min(initial_backoff * (multiplier ^ attempt), max_wait) + jitter(). Use initial_backoff = 1 second, multiplier = 2, max_wait = 60 seconds, and jitter = random(0, wait_time). This prevents the wait from growing unboundedly.
Should I retry inside the loop body or wrap the entire loop?
Retry inside the loop body (wrap the body step). If the loop itself fails, wrapping the loop makes sense, but more often individual items fail transiently—retry those.
How do I know if a loop exhausted max iterations vs. completed normally?
Check the _loop_exited_normally flag. True means the condition became false; False means max iterations was hit. Log this to understand whether the loop is hitting safety limits.
Can I have nested loops?
Yes. A loop body can execute another loop node. However, deeply nested loops (3+ levels) are hard to debug. Consider flattening with a single loop over a flattened list or using a sub-workflow.