AI Workflow Tool Integration: Add External APIs
A tool step in a workflow is a node that calls an external API, database query, or service (e.g., Slack API, Stripe, your company's user database). Unlike LLM steps, which call language models, tool steps call deterministic functions and return structured results. When combined with LLM steps, tools allow workflows to read real data and take actions—moving from pure reasoning to action-oriented automation.
In this article, you will define tool schemas, implement reliable tool steps, handle tool errors, and learn how to compose multiple tools into coherent workflows that integrate with external systems.
What Is a Tool Definition?
A tool definition is a schema that describes an external capability: its name, purpose, input parameters, and expected output. Tools can be HTTP APIs, database queries, cloud services, or any deterministic function. A well-defined tool allows any node (including LLM steps) to invoke it correctly.
from typing import Any, Dict, Callable
from pydantic import BaseModel, Field
import httpx
import json
class ToolDefinition:
"""
Describes an external tool (API, database query, function).
"""
def __init__(
self,
name: str,
description: str,
input_schema: BaseModel,
handler: Callable,
):
self.name = name
self.description = description
self.input_schema = input_schema
self.handler = handler
def to_dict(self) -> dict:
"""Export the tool definition as a dict (useful for APIs)."""
return {
"name": self.name,
"description": self.description,
"input_schema": self.input_schema.model_json_schema(),
}
async def invoke(self, arguments: dict) -> dict:
"""
Invoke the tool with the given arguments.
Args:
arguments: Dict matching the input_schema.
Returns:
Dict with "success" bool and "result" or "error" key.
"""
try:
# Validate arguments against the schema.
validated = self.input_schema(**arguments)
# Call the handler.
result = await self.handler(validated)
return {
"success": True,
"result": result,
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
# Example: Fetch customer data from a CRM API
class FetchCustomerInput(BaseModel):
customer_id: str = Field(..., description="The unique customer ID")
include_history: bool = Field(
default=False, description="Include purchase history"
)
async def fetch_customer_handler(input_params: FetchCustomerInput) -> dict:
"""
Fetch customer data from a CRM API.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://crm.example.com/api/customers/{input_params.customer_id}",
headers={"Authorization": "Bearer YOUR_API_KEY"},
)
response.raise_for_status()
customer = response.json()
if input_params.include_history:
hist_response = await client.get(
f"https://crm.example.com/api/customers/{input_params.customer_id}/history",
headers={"Authorization": "Bearer YOUR_API_KEY"},
)
hist_response.raise_for_status()
customer["history"] = hist_response.json()
return customer
fetch_customer_tool = ToolDefinition(
name="fetch_customer",
description="Fetch customer details from the CRM, optionally including purchase history.",
input_schema=FetchCustomerInput,
handler=fetch_customer_handler,
)
# Example: Log an event to an analytics system
class LogEventInput(BaseModel):
event_name: str = Field(..., description="Name of the event (e.g., 'purchase', 'login')")
user_id: str = Field(..., description="User ID")
properties: dict = Field(
default_factory=dict, description="Additional event properties"
)
async def log_event_handler(input_params: LogEventInput) -> dict:
"""
Log an event to the analytics system.
"""
async with httpx.AsyncClient() as client:
payload = {
"event_name": input_params.event_name,
"user_id": input_params.user_id,
"properties": input_params.properties,
"timestamp": datetime.utcnow().isoformat(),
}
response = await client.post(
"https://analytics.example.com/api/events",
json=payload,
headers={"Authorization": "Bearer YOUR_API_KEY"},
)
response.raise_for_status()
return {"event_id": response.json().get("event_id")}
log_event_tool = ToolDefinition(
name="log_event",
description="Log a user event to the analytics system.",
input_schema=LogEventInput,
handler=log_event_handler,
)
Tool Steps in a Workflow
A tool step is a node that invokes a tool and writes the result to context:
from datetime import datetime
class ToolStep:
"""
A workflow node that invokes an external tool.
"""
def __init__(self, name: str, tool: ToolDefinition):
self.name = name
self.tool = tool
async def execute(self, context: dict) -> dict:
"""
Execute the tool step.
Reads input variables from context, invokes the tool,
and writes the result back to context.
"""
# Build tool arguments from context.
# For simplicity, assume argument names match context keys.
tool_args = {}
for field_name, field_info in self.tool.input_schema.model_fields.items():
if field_name in context:
tool_args[field_name] = context[field_name]
elif field_info.is_required():
return {
**context,
f"{self.name}_success": False,
f"{self.name}_error": f"Missing required argument: {field_name}",
}
# Invoke the tool.
result = await self.tool.invoke(tool_args)
if result["success"]:
return {
**context,
f"{self.name}_result": result["result"],
f"{self.name}_success": True,
}
else:
return {
**context,
f"{self.name}_error": result["error"],
f"{self.name}_success": False,
}
# Usage in a workflow:
fetch_customer_step = ToolStep(
name="fetch_customer",
tool=fetch_customer_tool,
)
log_event_step = ToolStep(
name="log_event",
tool=log_event_tool,
)
# Simulate a workflow context.
context = {
"customer_id": "CUST-12345",
"include_history": True,
}
# Execute the fetch.
context = await fetch_customer_step.execute(context)
print(f"Fetch success: {context['fetch_customer_success']}")
print(f"Customer: {context.get('fetch_customer_result', {}).get('name')}")
# Now log an event.
context["event_name"] = "customer_viewed"
context["user_id"] = "USER-999"
context["properties"] = {"customer_id": context["customer_id"]}
context = await log_event_step.execute(context)
print(f"Log success: {context['log_event_success']}")
Handling Tool Errors and Retries
Tools can fail for transient reasons (network timeouts, rate limits). Implement retries within the tool step:
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientToolStep:
"""Tool step with built-in retry logic."""
def __init__(
self,
name: str,
tool: ToolDefinition,
max_retries: int = 3,
backoff_base: float = 2.0,
):
self.name = name
self.tool = tool
self.max_retries = max_retries
self.backoff_base = backoff_base
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def invoke_with_retry(self, tool_args: dict) -> dict:
"""Invoke the tool with exponential backoff."""
return await self.tool.invoke(tool_args)
async def execute(self, context: dict) -> dict:
"""Execute with retry logic."""
# Extract arguments from context (same as before).
tool_args = {}
for field_name, field_info in self.tool.input_schema.model_fields.items():
if field_name in context:
tool_args[field_name] = context[field_name]
elif field_info.is_required():
return {
**context,
f"{self.name}_success": False,
f"{self.name}_error": f"Missing required argument: {field_name}",
}
# Invoke with retry.
try:
result = await self.invoke_with_retry(tool_args)
except Exception as e:
return {
**context,
f"{self.name}_success": False,
f"{self.name}_error": f"Tool failed after {self.max_retries} attempts: {e}",
}
if result["success"]:
return {
**context,
f"{self.name}_result": result["result"],
f"{self.name}_success": True,
}
else:
return {
**context,
f"{self.name}_error": result["error"],
f"{self.name}_success": False,
}
Tool Composition: Using Multiple Tools
Many workflows chain multiple tools together. Each tool's output becomes input to the next:
class ToolChain:
"""Execute a sequence of tool steps in order."""
def __init__(self, name: str, steps: list):
self.name = name
self.steps = steps # List of ToolStep instances
async def execute(self, initial_context: dict) -> dict:
"""
Execute all steps in sequence.
If any step fails, halt and return.
"""
context = initial_context
for step in self.steps:
context = await step.execute(context)
# Check for failure.
if not context.get(f"{step.name}_success", False):
# Log the failure.
context[f"{self.name}_failed_at_step"] = step.name
return context
context[f"{self.name}_success"] = True
return context
# Example: Customer onboarding workflow
onboarding_chain = ToolChain(
name="customer_onboarding",
steps=[
ResilientToolStep("fetch_customer", fetch_customer_tool),
ResilientToolStep("log_event", log_event_tool),
# Add more tools as needed
],
)
# Execute the chain.
initial_context = {
"customer_id": "CUST-12345",
"event_name": "onboarding_started",
"user_id": "USER-999",
}
result = await onboarding_chain.execute(initial_context)
if result["customer_onboarding_success"]:
print("Onboarding completed successfully.")
else:
print(f"Onboarding failed at step: {result.get('customer_onboarding_failed_at_step')}")
Table: Common Tool Integrations
| Tool Type | Example | Input | Output |
|---|---|---|---|
| Database | Query user table | SQL or object ID | Rows as JSON |
| HTTP API | Slack, Stripe, OpenAI | API endpoint + parameters | JSON response |
| Cloud Service | S3 file upload, BigQuery | File data + destination | Success/URL |
| Message Queue | Pub/Sub, Kafka | Topic + message | Message ID |
| Async Task | Background job | Task definition + args | Job ID + status URL |
Key Takeaways
- Tools are external capabilities defined by a schema (name, description, input, output).
- Tool steps invoke tools, read input from context, and write output back to context.
- Always validate input arguments against the tool schema before invoking.
- Implement retry logic with exponential backoff to handle transient failures.
- Use tool chains to sequence multiple tools, halting gracefully on failure.
- Log tool invocations and errors for observability and debugging.
Frequently Asked Questions
How do I handle secrets (API keys, database passwords) in tool definitions?
Store secrets in a secure vault (AWS Secrets Manager, HashiCorp Vault) and fetch them at runtime using the tool's handler. Never hardcode secrets in tool definitions or logs. Rotate secrets regularly and monitor for unauthorized access.
What if a tool returns data that does not match its schema?
Validate the response immediately after the tool returns. If the response does not match expectations, treat it as an error, log it, and escalate or retry. Use TypeScript/Pydantic to enforce response schemas.
Can an LLM step call a tool, or does a tool call always require a separate node?
Some platforms (OpenAI, Anthropic) support tool calling directly from LLM steps. The LLM can decide to call a tool, the platform invokes it, and the result is fed back to the LLM. This is more powerful than separate nodes but requires careful orchestration.
How do I rate-limit tool calls to avoid overwhelming external services?
Use a token bucket or sliding window rate limiter. Before invoking a tool, check the rate limit; if exceeded, either queue the request or return an error. Most external services document rate limits; respect them.
Should I cache tool responses?
Yes, if the tool is idempotent and the data does not change frequently. Cache tool responses (with a TTL) to reduce API costs and improve latency. For example, cache a customer fetch result for 1 hour; if the workflow runs again within that window, reuse the cached result.