Skip to main content

Scaling Vectors: Sharding and Replication

Scaling a vector database beyond a single node requires careful architecture. Sharding divides vectors across multiple nodes by a partition key; replication copies data across replicas for high availability and read throughput. Together, they enable vector databases to store and serve billions of embeddings across clusters. Poor sharding strategy causes data skew, uneven load, and bottlenecks.

Single-Node Limits and Why Sharding Is Necessary

A single vector database node has finite resources: RAM (for HNSW graph structure), disk (for persistent storage), CPU (for search). Typical limits:

MetricSingle Node Limit
Vectors100M–1B (depends on dimensionality and index)
Memory256 GB–1 TB
Queries per second1k–10k QPS
Network bandwidth10 Gbps–100 Gbps

A RAG system serving 1M users with 1k QPS requires 10B vectors (corpus of all documents). A single node cannot store or serve this. You must shard the index across multiple nodes.

Sharding Strategies

Hash-based sharding (simple, data-oblivious):

Partition vectors by hash of their ID. Each shard stores 1/N of all vectors:

def shard_id(point_id, num_shards):
"""Return shard index for a point ID."""
return hash(point_id) % num_shards

# Point 12345 with 8 shards
shard = shard_id(12345, num_shards=8) # Returns 0–7

Advantages: Simple, distributes load evenly, re-sharding requires rebalancing only 1/N data.

Disadvantages: No semantic awareness; related documents may hash to different shards, requiring broadcast search (search all shards, aggregate results).

Range-based sharding (semantic-aware):

Partition vectors by a metadata field (e.g., created_at, category):

def shard_range(created_date, num_shards=8):
"""Assign shard by date range."""
date_ranges = [
(date(2024, 1, 1), date(2024, 6, 30), 0),
(date(2024, 7, 1), date(2024, 12, 31), 1),
(date(2025, 1, 1), date(2025, 6, 30), 2),
# ... more ranges
]
for start, end, shard in date_ranges:
if start <= created_date <= end:
return shard
return num_shards - 1

Advantages: Queries targeting a date range hit a single shard (no broadcast).

Disadvantages: Uneven distribution if data is skewed (e.g., all new data in one shard). Resharding complex.

Consistent hashing (for dynamic addition/removal of shards):

Used in distributed systems when shards are added/removed dynamically:

from hashlib import md5
from bisect import bisect_right

class ConsistentHash:
def __init__(self, nodes, replicas=3):
self.nodes = nodes
self.replicas = replicas
self.ring = {}
self._rebuild()

def _rebuild(self):
self.ring = {}
for node in self.nodes:
for i in range(self.replicas):
key = f"{node}:{i}"
hash_val = int(md5(key.encode()).hexdigest(), 16)
self.ring[hash_val] = node

def get_node(self, point_id):
if not self.ring:
return None

hash_val = int(md5(str(point_id).encode()).hexdigest(), 16)
sorted_keys = sorted(self.ring.keys())
idx = bisect_right(sorted_keys, hash_val)
return self.ring[sorted_keys[idx % len(sorted_keys)]]

# Usage
hash_ring = ConsistentHash(["shard-0", "shard-1", "shard-2"])
shard = hash_ring.get_node(12345) # Returns a shard

Replication: High Availability and Read Scaling

Replication copies data across multiple replicas (nodes). Common setup: 1 leader + 2 followers.

Write path: Client writes to the leader. Leader replicates to followers (synchronously or asynchronously).

Read path: Client reads from any replica (follower or leader) for read scaling.

Qdrant replication example:

from qdrant_client import QdrantClient
from qdrant_client.models import ReplicatedCluster

# Configure replication (3 replicas, quorum=2)
cluster_config = ReplicatedCluster(
nodes=3,
quorum=2, # Writes succeed if replicated to 2+ replicas
)

# Nodes in the cluster: node-0 (leader), node-1, node-2 (followers)
# Client writes → leader (node-0) → replicate to node-1, node-2
# If node-0 fails, node-1 or node-2 takes over as leader

Failover logic:

If the leader fails, followers detect the failure (via heartbeat timeout) and elect a new leader:

# Pseudo-code for leader election (Raft consensus)
if leader_heartbeat_timeout:
self.term += 1
self.voted_for = self.node_id
send_vote_requests_to_all_nodes()

if majority_votes_received:
self.state = "leader"
replicate_to_followers()

Failover typically takes 5–30 seconds (consensus election time).

Vector Sharding + Replication Combined

A production cluster combines both:

┌─────────────────────────────────────────┐
│ Client (load balancer) │
│ - Shard by hash(point_id) % 8 │
│ - Route query to shard leader │
└──────────┬──────────────────────────────┘

┌──────┴──────┐
│ │
┌───▼─────┐ ┌──▼──────┐
│ Shard 0 │ │ Shard 1 │
│ (3 reps)│ │ (3 reps) │
│ L F1 F2│ │ L F1 F2 │
└─────────┘ └──────────┘
(12.5M vecs each)

Each shard stores 1/8 of vectors with 3 replicas (1 leader + 2 followers). Total capacity: 8 shards × 125M vecs/shard = 1B vectors across 24 nodes (8 shards × 3 replicas).

Write consistency:

  • Leader receives upsert, writes to local storage, replicates to followers.
  • Quorum writes (require ≥2/3 replicas acknowledge) ensure durability.

Read latency:

  • Client reads from shard leader (consistent read).
  • Or reads from any replica (eventual consistency, faster).

Resharding: Adding Capacity

When a shard exceeds capacity, split it:

Before: Shard 0 → 200M vectors

After:
Shard 0 → 100M vectors (keep vectors with hash % 8 == 0)
Shard 8 → 100M vectors (new shard, migrated vectors)

Resharding requires:

  1. Create new shard: Provision new nodes for shard 8.
  2. Migrate data: Scan shard 0, identify vectors for shard 8, copy to shard 8.
  3. Rebalance routing: Update load balancer; route new writes to shard 8.
  4. Sync trailing data: Catch up any writes that occurred during migration.
  5. Decommission old shard: Once shard 0 is below threshold, remove shard 8 and merge data back (if desired).

Resharding typically takes 1–6 hours for 100M vectors and causes brief elevation in latency during migration.

Distributed Querying Across Shards

When querying across shards:

  1. Broadcast search: Query all shards, aggregate top-k results.
from concurrent.futures import ThreadPoolExecutor

def distributed_search(query_vector, shards, k=10):
"""Search all shards in parallel, return top-k global results."""
results_by_shard = {}

with ThreadPoolExecutor(max_workers=len(shards)) as executor:
futures = {
executor.submit(shard.search, query_vector, k=k * 2): shard.id
for shard in shards
}
for future in concurrent.futures.as_completed(futures):
shard_id = futures[future]
results_by_shard[shard_id] = future.result()

# Aggregate: collect all results, re-rank by similarity, return top-k
all_results = []
for shard_id, results in results_by_shard.items():
all_results.extend(results)

# Sort by distance and return top-k
all_results.sort(key=lambda r: r.distance)
return all_results[:k]
  1. Latency: Broadcast search to N shards + merge takes p99_single_shard_latency × 1.5 to p99_single_shard_latency × 2 (overhead for parallelism + merge).

  2. Optimization: Use metadata filtering to prune shards before searching:

# If searching documents from 2026 only, and shard 0 contains 2024–2025 only,
# skip shard 0 entirely.
shards_to_search = [
s for s in shards
if s.date_range_overlap((date(2026, 1, 1), date(2026, 6, 2)))
]
distributed_search(query_vector, shards=shards_to_search, k=10)

Code Example: Sharding in Milvus

Milvus (Kubernetes-native) handles sharding automatically:

from pymilvus import Collection, connections

connections.connect("default", host="milvus-coordinator", port=19530)

# Milvus automatically shards across pods
# Configure number of shards via num_shards parameter
collection = Collection(
name="documents",
schema=schema,
using="default"
)

# Insert 10B vectors
# Milvus distributes across shards (pods) transparently
for batch in batches:
collection.insert(batch)

# Search returns results from all shards
results = collection.search(
data=[query_vector],
anns_field="vector",
param={"metric_type": "COSINE"},
limit=10,
)

Milvus handles sharding, replication, and failover transparently. You specify replication factor and shard count; Milvus manages distribution.

Key Takeaways

  • Hash-based sharding is simple and distributes load evenly; range-based sharding enables single-shard queries.
  • Replication (3 replicas, quorum writes) ensures durability and enables read scaling.
  • Broadcast search across shards adds 1.5–2x latency overhead; use metadata filtering to prune shards.
  • Resharding (adding shards) requires careful coordination and typically takes 1–6 hours.
  • Kubernetes-native databases (Milvus, Qdrant Cloud) automate sharding and replication.

Frequently Asked Questions

How many shards should I provision?

A rule of thumb: provision shards so each shard stores 100M–500M vectors. For 1B vectors, use 2–10 shards. Each shard still needs 3 replicas (1 leader + 2 followers), so 6–30 nodes total. Start with fewer shards and increase as you grow.

What is the latency overhead of broadcast search across N shards?

Broadcast search latency is approximately max(shard_latencies) × (1 + log(N)) where N is the number of shards. For 8 shards with 10ms p99 latency per shard, expect 10ms × (1 + log(8)) ≈ 30ms p99 globally.

Can I reshard without downtime?

No, resharding requires a brief window where some writes might be delayed or require routing to the old shard. Most systems schedule resharding during low-traffic periods. New architectures (consistent hashing, Kubernetes) minimize downtime but do not eliminate it entirely.

How does eventual consistency affect my application?

If you allow reads from followers (eventual consistency), a recently-written vector might not be searchable immediately (100ms–1s delay). Use leader reads (consistency) for critical queries and follower reads for non-critical, latency-sensitive queries.

Further Reading