Skip to main content

Testing Resilient Integrations: Chaos and Faults

Testing resilience is fundamentally different from testing correctness. A unit test verifies that a function returns the right value when everything works. A resilience test verifies that the system still functions when things break. This requires intentionally injecting faults (timeouts, errors, latency) and observing how the system responds. Chaos engineering formalizes this: systematically break things in controlled ways to find weaknesses before they happen in production.

The key insight is that you cannot know if your system is resilient until you test it under failure. If you have never tested what happens when the API times out, you do not know that your timeout handling works. If you have never tested what happens when two providers fail simultaneously, you do not know that your failover logic is correct.

Unit Testing Patterns

Start with unit tests for individual components:

Testing Exponential Backoff

import unittest
from unittest.mock import Mock, patch
import time

class TestExponentialBackoff(unittest.TestCase):
def test_retry_on_transient_failure(self):
"""Verify retry succeeds after transient failure."""
func = Mock(side_effect=[Exception("timeout"), Exception("timeout"), "success"])

result = exponential_backoff_retry(func, max_retries=3, base_delay=0.01)

self.assertEqual(result, "success")
self.assertEqual(func.call_count, 3)

def test_exponential_wait_times(self):
"""Verify wait times grow exponentially."""
func = Mock(side_effect=Exception("fail"))

with patch('time.sleep') as mock_sleep:
try:
exponential_backoff_retry(func, max_retries=3, base_delay=1.0)
except Exception:
pass

# Verify sleep was called with exponential times
sleep_calls = [call[0][0] for call in mock_sleep.call_args_list]
self.assertAlmostEqual(sleep_calls[0], 1.0, places=1)
self.assertAlmostEqual(sleep_calls[1], 2.0, places=1)
self.assertAlmostEqual(sleep_calls[2], 4.0, places=1)

def test_max_retries_exceeded(self):
"""Verify exception is raised after max retries."""
func = Mock(side_effect=Exception("always fails"))

with self.assertRaises(Exception) as context:
exponential_backoff_retry(func, max_retries=2)

self.assertIn("always fails", str(context.exception))
self.assertEqual(func.call_count, 3) # 1 attempt + 2 retries

Testing Circuit Breaker

class TestCircuitBreaker(unittest.TestCase):
def test_circuit_opens_on_threshold(self):
"""Circuit opens after N failures."""
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=1.0)
func = Mock(side_effect=Exception("fail"))

# Three failures should open the circuit
for _ in range(3):
try:
breaker.call(func)
except Exception:
pass

self.assertEqual(breaker.state, "open")

# Fourth call should fail immediately without calling func
with self.assertRaises(CircuitBreakerOpen):
breaker.call(func)

# func was only called 3 times (not on the 4th)
self.assertEqual(func.call_count, 3)

def test_circuit_recovery(self):
"""Circuit enters half-open and recovers on success."""
breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1)
failing_func = Mock(side_effect=Exception("fail"))
success_func = Mock(return_value="success")

# Open the circuit
for _ in range(2):
try:
breaker.call(failing_func)
except Exception:
pass

self.assertEqual(breaker.state, "open")

# Wait for recovery timeout
time.sleep(0.2)

# Circuit should be half-open now
result = breaker.call(success_func)

# After 2 successes, circuit closes
breaker.call(success_func)
self.assertEqual(breaker.state, "closed")

Integration Testing with Fault Injection

Integration tests use a mock server that simulates failures:

import responses
from responses import matchers

class TestResilientClient(unittest.TestCase):
@responses.activate
def test_retry_on_503(self):
"""Client retries after 503 Service Unavailable."""
api_url = "https://api.example.com/v1/complete"

# First call returns 503; second succeeds
responses.add(
responses.POST,
api_url,
status=503,
body="Service Unavailable"
)
responses.add(
responses.POST,
api_url,
json={"result": "success"},
status=200
)

client = ResilientLLMClient(api_key="test")
result = client.complete("test prompt")

self.assertEqual(result, "success")
self.assertEqual(len(responses.calls), 2) # 1 fail + 1 success

@responses.activate
def test_failover_on_provider_down(self):
"""Client fails over to secondary provider."""
openai_url = "https://api.openai.com/v1/chat/completions"
anthropic_url = "https://api.anthropic.com/v1/messages"

# OpenAI is down
responses.add(responses.POST, openai_url, status=503)

# Anthropic responds
responses.add(
responses.POST,
anthropic_url,
json={"content": [{"text": "response"}]},
status=200
)

client = ResilientLLMClient(
providers={
ProviderType.OPENAI: "sk-...",
ProviderType.ANTHROPIC: "sk-ant-..."
}
)

result = await client.complete("test")
self.assertEqual(result, "response")

@responses.activate
def test_timeout_handling(self):
"""Client handles timeout gracefully."""
api_url = "https://api.example.com/v1/complete"

# Simulate timeout by closing connection
responses.add(
responses.POST,
api_url,
body=ConnectionError("connection timeout")
)

client = ResilientLLMClient(api_key="test")

with self.assertRaises(Exception):
client.complete("test", timeout=1.0)

Load Testing and Chaos Engineering

Simulate production load and failures:

import concurrent.futures
import random

def load_test_resilient_client():
"""Simulate 100 concurrent requests with random failures."""
client = ResilientLLMClient(api_key="test")

def make_request(request_id: int):
prompt = f"Request {request_id}: summarize this text"
try:
result = client.complete(prompt, timeout=5.0)
return {"request_id": request_id, "status": "success", "result": result}
except Exception as e:
return {"request_id": request_id, "status": "failure", "error": str(e)}

# Simulate 100 concurrent requests
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(make_request, i) for i in range(100)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]

# Analyze results
successes = sum(1 for r in results if r["status"] == "success")
failures = sum(1 for r in results if r["status"] == "failure")

print(f"Success rate: {successes}/{len(results)} ({100*successes//len(results)}%)")
print(f"Failures: {failures}")

return successes / len(results) >= 0.95 # 95% success rate

def chaos_test_with_fault_injection():
"""Deliberately inject faults and measure recovery."""
from unittest.mock import patch

client = ResilientLLMClient(api_key="test")

# Simulate API returning random errors
def mock_api_call(*args, **kwargs):
if random.random() < 0.3: # 30% failure rate
if random.random() < 0.5:
raise TimeoutError("request timeout")
else:
raise Exception("HTTP 503 Service Unavailable")
return "success"

with patch.object(client, "_call_provider", side_effect=mock_api_call):
success_count = 0
for _ in range(100):
try:
result = client.complete("test prompt")
if result == "success":
success_count += 1
except Exception:
pass

print(f"Success rate under chaos: {success_count}/100")

# Verify resilience: even with 30% failure rate, should succeed >90% of time
return success_count >= 90

Observability and Monitoring

Instrument your code to understand behavior under failure:

import logging
from prometheus_client import Counter, Histogram
import time

# Prometheus metrics
request_counter = Counter(
"llm_requests_total",
"Total requests",
["provider", "status"]
)

request_latency = Histogram(
"llm_request_latency_seconds",
"Request latency",
["provider"]
)

circuit_breaker_state = Gauge(
"llm_circuit_breaker_state",
"Circuit breaker state (0=closed, 1=open)",
["provider"]
)

class ObservableResilientClient(ResilientLLMClient):
async def complete(self, prompt: str, **kwargs) -> str:
"""Complete with observability."""
start_time = time.time()
provider_used = None
status = "success"

try:
result = await super().complete(prompt, **kwargs)
return result
except Exception as e:
status = "failure"
raise
finally:
elapsed = time.time() - start_time

# Record metrics
request_counter.labels(
provider=provider_used or "unknown",
status=status
).inc()

if provider_used:
request_latency.labels(provider=provider_used).observe(elapsed)

# Log
logger.info(
f"Request completed",
extra={
"provider": provider_used,
"status": status,
"latency": elapsed,
"prompt_length": len(prompt)
}
)

Continuous Testing

Run resilience tests in CI/CD:

# .github/workflows/resilience-tests.yml
name: Resilience Tests

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Run unit tests
run: python -m pytest tests/unit -v

- name: Run integration tests
run: python -m pytest tests/integration -v

- name: Run load test
run: python -m pytest tests/load -v
env:
LOAD_TEST_CONCURRENCY: 100
LOAD_TEST_REQUESTS: 1000

- name: Run chaos test
run: python -m pytest tests/chaos -v

- name: Upload metrics
if: always()
uses: actions/upload-artifact@v3
with:
name: resilience-metrics
path: metrics/

Key Takeaways

  • Test resilience by deliberately injecting faults (timeouts, errors, latency).
  • Unit test each pattern (backoff, circuit breaker, failover) in isolation.
  • Integration test with fault-injection tools (responses library, chaos servers).
  • Load test to verify behavior under concurrent requests.
  • Monitor and instrument production to detect failures in the wild.

Frequently Asked Questions

How often should I run chaos tests?

Run chaos tests on every pull request (in CI). Run extended chaos tests (hours-long) weekly on staging. In production, run continuous chaos engineering (Netflix-style) with careful blast radius control.

What failure modes should I test?

Test: timeouts, 503 errors, 429 rate limits, connection drops, slow responses (>10 seconds), partial responses, and simultaneous failures (two providers down). Prioritize by likelihood and impact.

How do I test without hitting real APIs?

Use mock servers (responses library, WireMock, Prism) that simulate API behavior. Record real API responses and replay them in tests. Never test against production; always use staging or mocks.

Should I test every code path?

Focus on critical paths (request completion, failover logic) and edge cases (all providers down, circuit breaker recovery). Aim for 80%+ coverage of resilience code. Tool: pytest-cov.

Further Reading