Cache Invalidation and Staleness Management
Cache invalidation is the hardest problem in computer science: when should a cached response be discarded and recomputed? For semantic caches, the answer depends on your domain. If you are caching historical facts (e.g., "What year was Python released?" = 1991), invalidation is rarely needed. If you are caching LLM analysis of live data (e.g., "Summarize today's earnings calls"), invalidation must be frequent and precise.
This article covers three invalidation strategies: time-based TTL (simple, stateless), event-triggered invalidation (precise, complex), and version pinning (for model/data updates). You will learn to detect staleness, test cache coherence, and measure the trade-off between freshness and cost.
Time-Based TTL (Time-To-Live)
The simplest invalidation strategy assigns each cached response an expiration time. After expiration, the cache entry is ignored and a new LLM call is made. TTL works well for responses whose validity window is predictable.
Example: TTL implementation
from datetime import datetime, timedelta
import time
class CacheEntryWithTTL:
"""A cached entry with time-based expiration."""
def __init__(self, embedding, response: str, ttl_seconds: int):
self.embedding = embedding
self.response = response
self.created_at = datetime.utcnow()
self.ttl_seconds = ttl_seconds
def is_expired(self) -> bool:
"""Check if the entry has exceeded its TTL."""
age = (datetime.utcnow() - self.created_at).total_seconds()
return age > self.ttl_seconds
def age_seconds(self) -> float:
"""Return the age of this entry in seconds."""
return (datetime.utcnow() - self.created_at).total_seconds()
class SemanticCacheWithTTL:
"""Semantic cache with per-entry TTL invalidation."""
def __init__(self, threshold: float = 0.95):
self.threshold = threshold
self.cache = [] # List of (embedding, CacheEntryWithTTL, metadata)
def find_valid_similar(self, query_embedding):
"""
Search for similar cached responses that are not expired.
Returns: (cached_response, similarity_score) if found, else None.
"""
best_match = None
best_similarity = self.threshold
for cached_embedding, cache_entry, metadata in self.cache:
# Skip expired entries
if cache_entry.is_expired():
continue
similarity = np.dot(query_embedding, cached_embedding)
if similarity > best_similarity:
best_similarity = similarity
best_match = (cache_entry.response, similarity)
return best_match
def store(self, query: str, embedding, response: str, ttl_seconds: int):
"""Store a cache entry with a TTL."""
cache_entry = CacheEntryWithTTL(embedding, response, ttl_seconds)
metadata = {"query": query, "ttl_seconds": ttl_seconds}
self.cache.append((embedding, cache_entry, metadata))
def cleanup_expired(self):
"""Remove expired entries from cache (optional, but reduces memory)."""
initial_size = len(self.cache)
self.cache = [
(e, c, m) for e, c, m in self.cache
if not c.is_expired()
]
removed = initial_size - len(self.cache)
print(f"Cleaned {removed} expired entries")
TTL guidelines by domain:
| Domain | Typical TTL | Reason |
|---|---|---|
| Historical facts | 30–90 days or never | Facts about historical events, definitions rarely change |
| Current events | 1–24 hours | News, trending topics, breaking information |
| Product info | 12–24 hours | Pricing, features, availability may change |
| Code examples | 7–30 days | APIs and libraries evolve; documentation becomes stale |
| User-specific data | 1–6 hours | User profile, preferences, permissions may be updated by user actions |
| Real-time analytics | 5–30 minutes | Dashboards, metrics, live data must reflect recent state |
In practice, a mixed TTL strategy (longer TTLs for fact-based, shorter for live data) is most effective. Set TTLs on a per-response or per-query-category basis.
Event-Triggered Invalidation
For higher precision, invalidate the cache when specific events occur: a database record changes, a configuration update happens, or a model is re-deployed.
Example: Event-driven invalidation
class SemanticCacheWithEvents:
"""Semantic cache with event-triggered invalidation."""
def __init__(self, threshold: float = 0.95):
self.threshold = threshold
self.cache = []
# Map event types to affected cache entry IDs
self.event_subscriptions = {} # event_type -> set(entry_ids)
def store(self, query: str, embedding, response: str,
entry_id: str, event_tags: list[str]):
"""
Store a cache entry associated with specific events.
Example: entry_id='user_123_profile', event_tags=['user.updated', 'config.changed']
"""
metadata = {
"query": query,
"entry_id": entry_id,
"event_tags": event_tags,
"created_at": datetime.utcnow().isoformat()
}
self.cache.append((embedding, response, metadata))
# Register this entry for invalidation on certain events
for tag in event_tags:
if tag not in self.event_subscriptions:
self.event_subscriptions[tag] = set()
self.event_subscriptions[tag].add(entry_id)
def on_event(self, event_type: str):
"""
Handle an invalidation event. Remove all entries tagged with this event.
"""
if event_type not in self.event_subscriptions:
return
entry_ids_to_remove = self.event_subscriptions[event_type]
removed = 0
self.cache = [
(e, r, m) for e, r, m in self.cache
if m["entry_id"] not in entry_ids_to_remove
]
removed = len(entry_ids_to_remove)
print(f"Event '{event_type}' invalidated {removed} cache entries")
# Clean up subscription
del self.event_subscriptions[event_type]
Usage example: Webhook-driven invalidation
cache = SemanticCacheWithEvents()
# Cache a response associated with a user profile
embedding = embed_text("What is my account status?")
response = "Your account is active..."
cache.store(
query="What is my account status?",
embedding=embedding,
response=response,
entry_id="user_456_account",
event_tags=["user_456.updated", "billing.changed"]
)
# When the user updates their profile, fire an event
def on_user_update(user_id: str):
cache.on_event(f"user_{user_id}.updated") # Invalidates all entries for this user
on_user_update("456") # Clears cache for user 456
Version Pinning and Model Rollouts
When you deploy a new embedding model or LLM version, cached responses from the old model become potentially misaligned. Version pinning solves this by tracking which model version generated each response.
Example: Version-pinned cache
class VersionedSemanticCache:
"""Semantic cache with model version tracking."""
CURRENT_EMBEDDING_MODEL = "text-embedding-3-small"
CURRENT_LLM_MODEL = "gpt-4o-2026-06-02" # Versioned model name
def __init__(self, threshold: float = 0.95):
self.threshold = threshold
self.cache = []
def store(self, query: str, embedding, response: str):
"""Store response with the current model versions."""
metadata = {
"query": query,
"embedding_model": self.CURRENT_EMBEDDING_MODEL,
"llm_model": self.CURRENT_LLM_MODEL,
"created_at": datetime.utcnow().isoformat()
}
self.cache.append((embedding, response, metadata))
def find_similar(self, query_embedding):
"""
Search for similar responses from the CURRENT model version only.
Ignore entries from older versions.
"""
best_match = None
best_similarity = self.threshold
for cached_embedding, cached_response, metadata in self.cache:
# Only use entries from the current model version
if metadata["embedding_model"] != self.CURRENT_EMBEDDING_MODEL:
continue
if metadata["llm_model"] != self.CURRENT_LLM_MODEL:
continue
similarity = np.dot(query_embedding, cached_embedding)
if similarity > best_similarity:
best_similarity = similarity
best_match = (cached_response, similarity)
return best_match
When deploying a new model, set CURRENT_EMBEDDING_MODEL and CURRENT_LLM_MODEL to the new versions. The old cache entries are preserved but ignored. Once you verify the new model performs well, purge old entries or run a background job to re-embed and recompute.
Measuring Staleness and Coherence
To ensure cache freshness, monitor:
- Cache age distribution: Histogram of how old cached entries are.
- Staleness ratio: Fraction of served cached responses that are beyond your freshness SLO.
- Cache coherence: Detect mismatches between cached responses and current LLM outputs.
Example: Staleness monitoring
def measure_staleness(cache: VersionedSemanticCache):
"""Analyze staleness of cached entries."""
if not cache.cache:
print("Cache is empty")
return
now = datetime.utcnow()
ages = [
(now - datetime.fromisoformat(m["created_at"])).total_seconds() / 3600
for _, _, m in cache.cache
]
print(f"Cache entries: {len(ages)}")
print(f"Median age: {np.median(ages):.1f} hours")
print(f"Max age: {np.max(ages):.1f} hours")
print(f"Entries >24h old: {sum(1 for a in ages if a > 24)} ({100 * sum(1 for a in ages if a > 24) / len(ages):.1f}%)")
def check_cache_coherence(cache, sample_size: int = 10):
"""
Random sample: fetch a cached response and recompute it.
Measure if the LLM produces the same answer.
"""
import random
if len(cache.cache) < sample_size:
sample_size = len(cache.cache)
samples = random.sample(cache.cache, sample_size)
mismatches = 0
for cached_embedding, cached_response, metadata in samples:
query = metadata["query"]
fresh_response = call_llm(query) # Recompute
# Simple check: do responses share >60% of tokens?
cached_words = set(cached_response.lower().split())
fresh_words = set(fresh_response.lower().split())
overlap = len(cached_words & fresh_words) / max(len(cached_words), len(fresh_words))
if overlap < 0.6:
mismatches += 1
print(f"Coherence issue on '{query[:50]}': {overlap:.1%} overlap")
print(f"Coherence check: {mismatches}/{sample_size} mismatches ({100 * mismatches / sample_size:.1f}%)")
Key Takeaways
- TTL (time-based) invalidation is simple and stateless; choose TTLs by domain (hours for live data, days/weeks for facts).
- Event-triggered invalidation is precise but requires event infrastructure; use for multi-tenant systems where different users need different cache coherence.
- Version pinning prevents model rollout issues by ignoring cached entries from old embedding/LLM models; enables gradual migration.
- Monitor cache staleness (age distribution, TTL compliance) and coherence (sample mismatches between cached and fresh responses) to tune invalidation policies.
Frequently Asked Questions
How do I choose between TTL and event-driven invalidation?
For simplicity and horizontal scalability, use TTL. For precision and cost efficiency in production, layer both: short TTLs as a safety net, events for precise invalidation. TTL is a default; events are your optimization.
Can I have different TTLs for different types of queries?
Yes. Tag queries by category (e.g., "fact", "live_data", "user_specific") and assign TTLs per category. Or compute TTL dynamically based on the cached response content (e.g., if response mentions "today", use 1-hour TTL).
What happens to cache entries during a LLM model upgrade?
With version pinning, old cached entries are ignored. Requests will recompute with the new model, filling the cache with new entries. After a week, audit old entries for coherence and purge if necessary to free memory.
How do I test cache coherence before deploying to production?
Run a staging environment: cache responses with the old model, then switch to the new model and sample 100–1000 cached entries. Measure how often fresh responses differ significantly (token overlap < 0.8). If >5% mismatch rate, investigate before production rollout.
Can expired entries still be served if nothing else matches?
Yes, you can implement a fallback: if no non-expired entries match, serve the best expired entry with a staleness warning in metadata. This trades freshness for availability; use only if you can flag stale responses to the user.
Further Reading
- The Cache Coherence Problem and Solutions (CMU, 2015) — Academic foundation for multi-level cache consistency.
- Event-Driven Architecture and Cache Invalidation Patterns — Design patterns for distributed invalidation.
- Time-Based Cache Eviction and TTL Strategies (Redis Docs) — Practical TTL implementation.
- A/B Testing Cache Policies in Production — How to measure freshness impact on user satisfaction.