Scaling Knowledge Graphs in Production
Scaling a knowledge graph from millions to hundreds of millions of entities requires careful architecture: strategic indexing, query optimization, and data partitioning. Production knowledge graphs serve trillions of queries daily across industries (Google Knowledge Graph, LinkedIn Graph, Facebook Social Graph). This final article walks through deployment patterns that maintain sub-100 ms query latency at enterprise scale.
Scaling Challenges and Solutions
As graphs grow, challenges compound:
| Challenge | Scale | Solution |
|---|---|---|
| Query latency | 10M entities: 50 ms | Indexing + query caching |
| Storage | 100M entities: 50–100 GB | Graph compression, data partitioning |
| Write throughput | 10K writes/sec | Batch loading, asynchronous updates |
| Graph traversal | Multi-hop queries slow down | Precomputed indexes, graph summarization |
| Data consistency | Multiple sources conflict | Conflict resolution strategies |
A reference architecture handles all these:
[User Requests]
|
v
[Query Router] (route based on shard key)
|
v
[Cache Layer] (Redis: 100K QPS, 10 ms latency)
|
v
[Shard 1] [Shard 2] [Shard 3] [Shard 4] (4–16 Neo4j instances)
| | | |
v v v v
[SSD Storage: 25–50 GB/shard, indexed]
Indexing Strategies
Indexes are critical. Without them, queries degrade to O(N) scans. Create indexes on:
-- Index on frequently filtered properties
CREATE INDEX ON :Person(name);
CREATE INDEX ON :Company(name);
CREATE INDEX ON :Person(email);
-- Composite indexes for compound filters
CREATE INDEX ON :Person(company_id, hire_year);
-- Full-text search index for text queries
CREATE FULLTEXT INDEX full_person_search FOR (p:Person) ON EACH [p.name, p.title];
-- Relationship property indexes (for filtering on edges)
CREATE INDEX ON :WORKS_FOR(start_date);
-- Unique constraints (also create indexes)
CREATE CONSTRAINT ON (p:Person) ASSERT p.email IS UNIQUE;
Impact on query performance:
-- Without index: ~5 seconds on 100M people
MATCH (p:Person {name: "Alice Johnson"}) RETURN p;
-- With index: ~5 ms
CREATE INDEX ON :Person(name);
MATCH (p:Person {name: "Alice Johnson"}) RETURN p;
Always profile queries with EXPLAIN:
EXPLAIN MATCH (p:Person)-[:WORKS_FOR]->(c:Company {name: "Google"})
WHERE p.salary > 150000
RETURN p.name, p.salary;
-- Examine the plan:
-- Scan by Property Index on :Company(name) ← Fast!
-- Expand (c)-[WORKS_FOR]-(p)
-- Filter p.salary > 150000
-- Total estimated rows: 500
Query Result Caching
Cache frequent queries to avoid redundant graph traversals:
from functools import lru_cache
import hashlib
import json
class CachedGraphExecutor:
"""Execute queries with result caching."""
def __init__(self, driver, cache_size: int = 10000, ttl_seconds: int = 1800):
self.driver = driver
self.cache = {}
self.cache_size = cache_size
self.ttl = ttl_seconds
def _cache_key(self, query: str, params: dict) -> str:
"""Generate a cache key from query and parameters."""
key_str = json.dumps({"query": query, "params": params}, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
def execute(self, query: str, params: dict = None) -> list:
"""Execute with caching."""
import time
params = params or {}
cache_key = self._cache_key(query, params)
# Check cache
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.ttl:
return cached_result
# Cache miss; execute
with self.driver.session() as session:
result = session.run(query, params)
records = [dict(record) for record in result]
# Store in cache
if len(self.cache) >= self.cache_size:
# Simple eviction: remove oldest entry
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[cache_key] = (records, time.time())
return records
def invalidate(self, query_pattern: str = None):
"""Invalidate cache for a query pattern (for updates)."""
if not query_pattern:
self.cache.clear()
else:
# Remove cache entries matching pattern
to_delete = [k for k in self.cache.keys() if query_pattern in k]
for k in to_delete:
del self.cache[k]
# Usage
# executor = CachedGraphExecutor(driver, cache_size=10000)
# result1 = executor.execute("MATCH (p:Person {name: $name}) RETURN p", {"name": "Alice"}) # Computed
# result2 = executor.execute("MATCH (p:Person {name: $name}) RETURN p", {"name": "Alice"}) # From cache
Sharding: Partitioning Across Multiple Instances
For 100M+ entities, shard the graph by entity type or ID ranges:
class ShardedGraphExecutor:
"""Route queries to appropriate shard."""
def __init__(self, shard_uris: list):
"""
Args:
shard_uris: List of Neo4j URIs, e.g.,
["bolt://shard1:7687", "bolt://shard2:7687", ...]
"""
from neo4j import GraphDatabase
self.drivers = [
GraphDatabase.driver(uri, auth=("neo4j", "password"))
for uri in shard_uris
]
self.num_shards = len(shard_uris)
def get_shard_for_entity(self, entity_id: str) -> int:
"""
Determine which shard holds an entity.
Strategy: hash-based or range-based.
"""
# Hash-based sharding
hash_val = hash(entity_id)
return hash_val % self.num_shards
def query_by_entity(self, entity_id: str, query_template: str) -> list:
"""Query for a specific entity using its shard."""
shard_idx = self.get_shard_for_entity(entity_id)
driver = self.drivers[shard_idx]
with driver.session() as session:
result = session.run(query_template, entity_id=entity_id)
return [dict(record) for record in result]
def query_all_shards(self, query: str) -> list:
"""Execute a query across all shards (scatter-gather)."""
results = []
for driver in self.drivers:
with driver.session() as session:
result = session.run(query)
results.extend([dict(record) for record in result])
return results
def close_all(self):
for driver in self.drivers:
driver.close()
# Usage: Sharding by entity type
# Shard 0: Person nodes
# Shard 1: Company nodes
# Shard 2: Relationship nodes (for cross-shard queries)
class TypeBasedSharding(ShardedGraphExecutor):
"""Shard by entity type."""
ENTITY_SHARD_MAP = {
"Person": 0,
"Company": 1,
"Location": 2,
}
def get_shard_for_entity_type(self, entity_type: str) -> int:
return self.ENTITY_SHARD_MAP.get(entity_type, 0)
Data Ingestion at Scale
Loading millions of entities efficiently requires batching and streaming:
from neo4j import GraphDatabase
from typing import List, Dict, Iterator
class BulkGraphLoader:
"""Load large datasets efficiently into a graph database."""
def __init__(self, uri: str, user: str, password: str, batch_size: int = 1000):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.batch_size = batch_size
def load_entities_batch(self, entities: List[Dict], entity_type: str):
"""Load entities in batches."""
with self.driver.session() as session:
for i in range(0, len(entities), self.batch_size):
batch = entities[i:i + self.batch_size]
# Unwind for efficient batch insert
cypher = f"""
UNWIND $batch AS e
CREATE (n:{entity_type})
SET n = e
"""
session.run(cypher, batch=batch)
print(f"Loaded {i + len(batch)} {entity_type} entities")
def load_relations_batch(self, relations: List[Dict]):
"""Load relationships in batches."""
with self.driver.session() as session:
for i in range(0, len(relations), self.batch_size):
batch = relations[i:i + self.batch_size]
# Match source and target; create relationship
cypher = """
UNWIND $batch AS r
MATCH (source {id: r.source_id})
MATCH (target {id: r.target_id})
CREATE (source)-[rel:RELATION]->(target)
SET rel.type = r.relation_type,
rel.confidence = r.confidence
"""
try:
session.run(cypher, batch=batch)
except Exception as e:
print(f"Batch insert failed: {e}")
# Fall back to individual inserts
for rel in batch:
try:
session.run("""
MATCH (source {id: $source_id})
MATCH (target {id: $target_id})
CREATE (source)-[rel:RELATION]->(target)
SET rel.type = $rel_type, rel.confidence = $confidence
""", source_id=rel["source_id"], target_id=rel["target_id"],
rel_type=rel["relation_type"],
confidence=rel.get("confidence", 0.5))
except:
pass
print(f"Loaded {i + len(batch)} relations")
def close(self):
self.driver.close()
# Example
# loader = BulkGraphLoader("bolt://localhost:7687", "neo4j", "password", batch_size=5000)
# entities = [...list of 1M dicts...]
# loader.load_entities_batch(entities, "Person")
# loader.close()
Monitoring and Alerting
Monitor graph health in production:
from neo4j import GraphDatabase
from datetime import datetime
class GraphHealthMonitor:
"""Monitor graph database health metrics."""
def __init__(self, driver):
self.driver = driver
def get_metrics(self) -> dict:
"""Retrieve health metrics."""
with self.driver.session() as session:
# Node count
node_count = session.run("MATCH (n) RETURN count(n) AS count").single()["count"]
# Relationship count
rel_count = session.run("MATCH ()-[r]-() RETURN count(r) AS count").single()["count"]
# Query performance (average execution time)
# Note: This requires query logging enabled
# Storage size (approximate)
# db.stats(); # Not directly available via driver
return {
"timestamp": datetime.now().isoformat(),
"node_count": node_count,
"relationship_count": rel_count,
"avg_node_degree": rel_count / max(node_count, 1),
}
def check_index_health(self) -> list:
"""List indexes and check for unused ones."""
with self.driver.session() as session:
result = session.run("SHOW INDEXES")
indexes = [dict(record) for record in result]
return indexes
def alert_if_slow(self, slow_query_threshold_ms: int = 1000):
"""Alert if slow queries detected."""
# In practice, integrate with a query logging service
# e.g., Prometheus, DataDog, New Relic
pass
# Usage
# monitor = GraphHealthMonitor(driver)
# metrics = monitor.get_metrics()
# print(f"Nodes: {metrics['node_count']}, Avg degree: {metrics['avg_node_degree']:.2f}")
Disaster Recovery and Backups
Regular backups and recovery procedures are essential:
#!/bin/bash
# Backup a Neo4j database
NEO4J_HOME="/var/lib/neo4j"
BACKUP_DIR="/backups/neo4j"
DB_NAME="neo4j"
# Full backup
neo4j-admin database dump $DB_NAME "$BACKUP_DIR/neo4j-$(date +%Y%m%d-%H%M%S).dump"
# Compress
gzip "$BACKUP_DIR"/*.dump
# Upload to S3
aws s3 cp "$BACKUP_DIR" s3://my-backup-bucket/neo4j/ --recursive
# Retention: keep last 30 days
find "$BACKUP_DIR" -mtime +30 -delete
Recovery:
# Restore from backup
neo4j-admin database restore neo4j "$BACKUP_DIR/neo4j-20260601-120000.dump" --force
Key Takeaways
- Indexing on frequently filtered properties is critical: 50–100x latency improvement.
- Query caching (in-memory or Redis) serves hot queries in milliseconds.
- Sharding partitions the graph across multiple instances, enabling linear scaling to 100M+ entities.
- Bulk loading with batching and unwind operations handles millions of entities efficiently.
- Monitoring health metrics and slow query logs prevents production incidents.
- Regular backups and disaster recovery procedures protect against data loss.
Frequently Asked Questions
How do I choose between vertical scaling (bigger hardware) and horizontal scaling (sharding)?
Vertical scaling is cheaper initially: upgrade to a 512 GB RAM machine, handle 50M entities. Beyond that, horizontal scaling is unavoidable. Sharding adds operational complexity but provides unlimited scalability.
What's the maximum latency for a graph query in production?
Aim for <100 ms for single-hop queries, <500 ms for multi-hop queries on a graph with 100M+ entities. Cache results for frequently accessed patterns. Use EXPLAIN to optimize slow queries.
How do I handle updates when sharding by entity type?
Cross-shard relationships require coordination. Use a distributed transaction coordinator (e.g., Consul) or a meta-shard that tracks entity locations. Alternatively, accept eventual consistency for cross-shard updates.
Can I shard Neo4j without application-level routing?
Neo4j Enterprise supports automated sharding (fabric). For Community/open-source, implement sharding in your application layer.
What's the cost of a production knowledge graph at scale?
On AWS: 4–8 Neo4j instances, each with 256 GB RAM and 2 TB NVMe storage. Cost: ~USD 50K–100K per month. With cloud auto-scaling and spot instances, negotiate down to USD 20K–40K/month.