Provider Failover Strategies for Multimodel Resilience
Provider failover is the practice of automatically switching to an alternative API provider when the primary one fails. Instead of relying on a single LLM API (OpenAI, Anthropic, Cohere), a robust system can call multiple providers and fall back gracefully when one goes down. This strategy survived the November 2024 OpenAI outage for teams using multiple providers, while single-provider users lost service entirely (CloudWare Incident Report, 2024).
Failover is different from load balancing. Load balancing distributes normal traffic across multiple providers to reduce load and cost. Failover activates only when a primary provider is unhealthy. The strategies range from simple (try A, then B if A fails) to sophisticated (actively monitor health, weight by cost/speed, mirror requests for comparison).
Active/Passive Failover (Simplest)
Active/passive failover means one provider is primary (active) and others are secondary (passive). Requests go to the primary. If the primary fails, retry requests go to the secondary. This is the easiest to implement and works well for most workloads.
Here is a simple implementation:
import requests
from enum import Enum
class Provider(Enum):
PRIMARY = "openai"
SECONDARY = "anthropic"
class FailoverClient:
def __init__(self, openai_key: str, anthropic_key: str):
self.openai_key = openai_key
self.anthropic_key = anthropic_key
def call_llm(self, prompt: str, max_retries: int = 1) -> str:
"""Try primary provider; fall back to secondary on failure."""
providers = [
(Provider.PRIMARY, self.call_openai),
(Provider.SECONDARY, self.call_anthropic)
]
last_error = None
for provider, func in providers:
for attempt in range(max_retries + 1):
try:
print(f"Trying {provider.value}...")
result = func(prompt)
print(f"Success with {provider.value}")
return result
except Exception as e:
last_error = e
print(f"{provider.value} failed: {e}")
if attempt < max_retries:
time.sleep(2 ** attempt) # Backoff within provider
raise Exception(f"All providers failed. Last error: {last_error}")
def call_openai(self, prompt: str) -> str:
"""Call OpenAI API."""
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {self.openai_key}"},
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}]
},
timeout=60
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
def call_anthropic(self, prompt: str) -> str:
"""Call Anthropic API."""
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers={"x-api-key": self.anthropic_key},
json={
"model": "claude-3-sonnet-20240229",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}]
},
timeout=60
)
response.raise_for_status()
return response.json()["content"][0]["text"]
# Usage
client = FailoverClient(openai_key="sk-...", anthropic_key="sk-ant-...")
result = client.call_llm("What is the capital of France?")
This approach is simple but has limitations: the secondary provider is idle during normal operation (wasting monitoring), and you discover failures only when they happen (no proactive health checks).
Weighted Failover (Balanced Load)
Weighted failover distributes normal traffic across multiple providers based on reliability, cost, or performance. When one provider fails, more traffic shifts to others until it recovers.
import random
import requests
class WeightedFailoverClient:
"""Route requests across providers by weight."""
def __init__(self, providers: dict):
"""
Args:
providers: Dict of {name: (weight, callable)}
Example: {"openai": (0.7, self.call_openai), "anthropic": (0.3, self.call_anthropic)}
"""
self.providers = providers
self.provider_names = list(providers.keys())
self.weights = [providers[name][0] for name in self.provider_names]
self.failure_counts = {name: 0 for name in self.provider_names}
def select_provider(self) -> tuple:
"""Select a provider based on weights and recent failures."""
# Adjust weights based on recent failures (penalize failing providers)
adjusted_weights = []
for i, name in enumerate(self.provider_names):
base_weight = self.weights[i]
# Reduce weight by 50% for each recent failure (decays over time)
penalty = 0.5 ** max(0, self.failure_counts[name])
adjusted_weights.append(base_weight * penalty)
# Normalize weights
total = sum(adjusted_weights)
normalized = [w / total for w in adjusted_weights]
# Select provider by weighted random choice
selected = random.choices(self.provider_names, weights=normalized, k=1)[0]
return selected, self.providers[selected][1]
def call_llm(self, prompt: str, max_retries: int = 2) -> str:
"""Route request to selected provider with failover."""
last_error = None
for attempt in range(len(self.provider_names) + max_retries):
provider_name, func = self.select_provider()
try:
print(f"Trying {provider_name}...")
result = func(prompt)
# Clear failure count on success
self.failure_counts[provider_name] = 0
print(f"Success with {provider_name}")
return result
except Exception as e:
self.failure_counts[provider_name] += 1
last_error = e
print(f"{provider_name} failed (count: {self.failure_counts[provider_name]}): {e}")
raise Exception(f"All providers exhausted. Last error: {last_error}")
# Usage
client = WeightedFailoverClient({
"openai": (0.7, call_openai), # 70% of traffic
"anthropic": (0.3, call_anthropic) # 30% of traffic
})
result = client.call_llm("What is the capital of France?")
This approach balances load and provides gradual degradation: if one provider has a minor issue, it gets fewer requests but is not completely bypassed.
Health-Check Based Failover
More sophisticated systems monitor provider health proactively. Rather than waiting for requests to fail, you periodically send small test requests to detect outages early.
import time
import threading
from typing import Dict, Callable
class HealthCheckedFailover:
"""Monitor provider health and route based on real-time status."""
def __init__(self, providers: Dict[str, Callable], check_interval: float = 30.0):
self.providers = providers
self.check_interval = check_interval
self.health_status = {name: True for name in providers.keys()}
self.lock = threading.Lock()
# Start health check thread
self.running = True
check_thread = threading.Thread(target=self._health_check_loop, daemon=True)
check_thread.start()
def _health_check_loop(self):
"""Periodically check provider health."""
while self.running:
for provider_name, func in self.providers.items():
try:
# Send a simple test request (e.g., ask for API status)
result = func("ping") # Lightweight test request
with self.lock:
if not self.health_status[provider_name]:
print(f"{provider_name} is recovering")
self.health_status[provider_name] = True
except Exception as e:
with self.lock:
if self.health_status[provider_name]:
print(f"{provider_name} is down: {e}")
self.health_status[provider_name] = False
time.sleep(self.check_interval)
def call_llm(self, prompt: str) -> str:
"""Call the first healthy provider."""
with self.lock:
healthy_providers = [
(name, func)
for name, func in self.providers.items()
if self.health_status[name]
]
if not healthy_providers:
raise Exception("No healthy providers available")
# Try healthy providers first
for provider_name, func in healthy_providers:
try:
return func(prompt)
except Exception as e:
print(f"{provider_name} failed during request: {e}")
raise Exception("All healthy providers failed during request")
def shutdown(self):
"""Stop health check loop."""
self.running = False
Health-check based failover catches outages within seconds and routes around them proactively. The downside is extra API calls for health checks consume quota.
Request Mirroring (Testing)
Request mirroring sends a request to the primary provider and a secondary provider simultaneously, compares results, and uses the primary result if both succeed. This is useful for validating new providers before switching to them entirely.
async function callLLMWithMirror(
prompt: string,
primaryCall: () => Promise<string>,
secondaryCall: () => Promise<string>
): Promise<{ primary: string; secondary?: string; agreement: boolean }> {
// Send both requests in parallel
const [primaryResult, secondaryResult] = await Promise.allSettled([
primaryCall(),
secondaryCall()
]);
let primary: string;
let secondary: string | undefined;
let agreement = false;
if (primaryResult.status === "fulfilled") {
primary = primaryResult.value;
} else {
// Primary failed; use secondary as fallback
if (secondaryResult.status === "fulfilled") {
primary = secondaryResult.value;
} else {
throw new Error("Both primary and secondary failed");
}
}
// If secondary succeeded, compare
if (secondaryResult.status === "fulfilled") {
secondary = secondaryResult.value;
agreement = primary === secondary;
console.log(`Mirror results agree: ${agreement}`);
}
return { primary, secondary, agreement };
}
// Usage: log mirror results to detect divergence
const result = await callLLMWithMirror(
"What is 2+2?",
() => callOpenAI("What is 2+2?"),
() => callAnthropic("What is 2+2?")
);
if (!result.agreement && result.secondary) {
console.warn(`Results diverged: OpenAI="${result.primary}" vs Anthropic="${result.secondary}"`);
}
Mirroring is expensive (2x API calls) but is invaluable for building confidence in multi-provider systems.
Key Takeaways
- Provider failover distributes risk across multiple APIs and survives individual outages.
- Active/passive is simplest but wastes secondary capacity; weighted failover balances load.
- Health-check based failover detects outages proactively but adds overhead.
- Request mirroring validates provider consistency at the cost of double API calls.
- Combine strategies: use weighted failover for load distribution and health checks for detection.
Frequently Asked Questions
Should I use failover or just one provider with better error handling?
Failover is insurance against catastrophic outages (provider downtime, widespread API failures). Good error handling (retries, exponential backoff) is necessary but not sufficient. If your provider goes down for 2 hours, error handling alone cannot save you. Failover can.
How do I avoid duplicating logic across providers?
Create an abstract LLM client interface and implement it for each provider:
from abc import ABC, abstractmethod
class LLMClient(ABC):
@abstractmethod
async def complete(self, prompt: str) -> str: ...
class OpenAIClient(LLMClient):
async def complete(self, prompt: str) -> str: ...
class AnthropicClient(LLMClient):
async def complete(self, prompt: str) -> str: ...
# Polymorphic failover
clients = [OpenAIClient(), AnthropicClient()]
for client in clients:
try:
return await client.complete(prompt)
except Exception: ...
What if different providers have different model capabilities?
Track capabilities per provider and route based on request requirements:
capabilities = {
"openai": {"vision": True, "function_calls": True},
"anthropic": {"vision": True, "function_calls": False}
}
def select_provider_for_task(task: str) -> str:
if task == "vision":
return "openai" # Both support it; pick primary
elif task == "function_calls":
return "openai" # Only openai
else:
return "anthropic" # Use cheaper option for basic tasks
How do I handle cost differences across providers?
Weight providers by cost + reliability. Cheaper but less reliable providers get lower weight. You can also failover based on cost: try the cheap provider first, then fall back to expensive if it fails.