Bulk Ingestion and Upserts in Production
Bulk ingestion and upserts are critical for production vector systems. Initial bulk loading of millions of embeddings must complete in hours, not days. Real-time upserts (update-or-insert) must handle fresh data without blocking searches. Deletes must remove outdated or sensitive data reliably. Poor ingestion strategy causes delayed deployments, data inconsistency, and production outages.
Bulk Ingestion: Strategies and Throughput
Bulk ingestion is the initial load of millions of embeddings into a vector database. You typically embed documents/images in a data pipeline, batch them, and load into the database.
Single-threaded ingestion (baseline, slow):
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import time
client = QdrantClient("localhost", port=6333)
start = time.time()
points = []
for i, embedding in enumerate(all_embeddings):
points.append(
PointStruct(
id=i,
vector=embedding,
payload={
"doc_id": f"doc_{i}",
"title": f"Document {i}",
}
)
)
# Single upsert call with all 1M points
client.upsert(collection_name="documents", points=points)
elapsed = time.time() - start
print(f"Loaded 1M vectors in {elapsed:.1f}s ({1_000_000 / elapsed:.0f} vec/s)")
# Expected: 100K–300K vec/s
This approach is simple but slow: ~100K–300K vectors/second. For 100M vectors, this takes 5–15 minutes of blocking upserts.
Parallelized ingestion (faster):
Multiple threads/processes batch and insert concurrently:
from concurrent.futures import ThreadPoolExecutor, as_completed
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import time
def batch_insert(client, collection_name, batch):
"""Insert a batch of points."""
client.upsert(collection_name=collection_name, points=batch)
client = QdrantClient("localhost", port=6333)
batch_size = 10_000
num_workers = 8
start = time.time()
batches = [
[
PointStruct(
id=i,
vector=all_embeddings[i],
payload={"doc_id": f"doc_{i}"}
)
for i in range(start_idx, min(start_idx + batch_size, len(all_embeddings)))
]
for start_idx in range(0, len(all_embeddings), batch_size)
]
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(batch_insert, client, "documents", batch)
for batch in batches
]
for future in as_completed(futures):
future.result()
elapsed = time.time() - start
throughput = len(all_embeddings) / elapsed
print(f"Loaded {len(all_embeddings)} vectors in {elapsed:.1f}s ({throughput:.0f} vec/s)")
# Expected: 500K–2M vec/s (8x–10x speedup)
Parallelized ingestion with 8 workers achieves 500K–2M vectors/second. For 100M vectors, this completes in 1–3 minutes.
Advanced: Disable indexing during bulk load, rebuild after:
Some vector databases (Milvus, Qdrant in batch mode) support disabling index updates during bulk load, then rebuilding the index once:
from qdrant_client import QdrantClient
client = QdrantClient("localhost", port=6333)
# Disable indexing (if supported by your DB)
# Milvus: flush() does not trigger compaction
# Qdrant: Use batch mode or disable HNSW config temporarily
# Bulk insert without index updates
for batch in batches:
client.upsert(collection_name="documents", points=batch)
# Rebuild index once
# Milvus: collection.compact()
# Qdrant: create_payload_index() or recreate collection
client.create_payload_index(
collection_name="documents",
field_name="created_at",
field_schema="datetime",
)
This bypasses index construction during load (which is slow) and rebuilds once at the end. Throughput improves to 2M–5M vectors/second for non-indexed bulk loads.
Handling Upserts: Update-or-Insert Logic
An upsert updates an existing vector/payload by ID, or inserts if the ID does not exist. Most vector databases support upserts natively:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
client = QdrantClient("localhost", port=6333)
# Upsert a single point
client.upsert(
collection_name="documents",
points=[
PointStruct(
id=123, # If ID 123 exists, update it; else insert.
vector=[0.1, 0.2, 0.3, ...],
payload={"title": "Updated Title", "rating": 4.9}
)
]
)
Upsert is idempotent: running it twice with the same data is safe. It updates both vector and payload.
Selective payload update (metadata only, no vector change):
Some databases support updating only the payload without re-indexing the vector:
# Qdrant: update_points_batch
client.update_points_batch(
collection_name="documents",
update_operations=[
PointIdPayload(id=123, payload={"rating": 5.0, "reviewed": True}),
]
)
Payload-only updates are faster (no vector re-indexing) but immutably affect that point's metadata.
Real-Time Ingestion: Streaming and Batching
In production RAG systems, you often ingest documents continuously: new articles, logs, or user-generated content. Batching is critical to avoid single-point upserts (slow).
Streaming ingestion with batching:
from queue import Queue
from threading import Thread
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import time
class StreamingIngester:
def __init__(self, client, collection_name, batch_size=1000, flush_interval=5):
self.client = client
self.collection_name = collection_name
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = Queue()
self.batch = []
self.last_flush = time.time()
# Start background worker
self.worker = Thread(target=self._worker, daemon=True)
self.worker.start()
def add_point(self, point_id, vector, payload):
"""Queue a point for insertion."""
self.queue.put(PointStruct(id=point_id, vector=vector, payload=payload))
def _worker(self):
"""Batch points and flush to database."""
while True:
try:
point = self.queue.get(timeout=0.1)
self.batch.append(point)
# Flush if batch is full or flush interval exceeded
should_flush = (
len(self.batch) >= self.batch_size or
time.time() - self.last_flush > self.flush_interval
)
if should_flush and self.batch:
self.client.upsert(
collection_name=self.collection_name,
points=self.batch
)
self.batch = []
self.last_flush = time.time()
except Exception:
pass # Queue.get timeout
def flush(self):
"""Flush remaining points."""
if self.batch:
self.client.upsert(
collection_name=self.collection_name,
points=self.batch
)
self.batch = []
# Usage
ingester = StreamingIngester(client, "documents", batch_size=1000, flush_interval=5)
# Continuously add points from a data stream
for doc in data_stream:
embedding = embed_model.encode(doc["text"])
ingester.add_point(
point_id=doc["id"],
vector=embedding,
payload={"title": doc["title"], "source": doc["source"]}
)
# Flush before shutdown
ingester.flush()
This batches points and flushes every 1000 points or 5 seconds, whichever comes first. Real-world throughput: 10K–50K vectors/second.
Handling Deletes: Soft and Hard Deletes
Deletes remove vectors from the database. Two strategies:
Hard delete: Remove the point entirely. Fast and permanent.
client.delete(
collection_name="documents",
points_selector=PointIdsList(ids=[123, 124, 125])
)
Soft delete: Mark the point as deleted (via a deleted payload field) but keep it in the database. Enables recovery and audit trails.
# Soft delete
client.update_points_batch(
collection_name="documents",
update_operations=[
PointIdPayload(id=123, payload={"deleted": True, "deleted_at": datetime.now()}),
]
)
# Filter out deleted points in search
Filter(
must_not=[
FieldCondition(key="deleted", match=MatchValue(value=True))
]
)
Soft deletes are safer (reversible) but consume storage. Hard deletes save space but are irreversible.
Data Consistency During Ingestion
In production, ensure consistency:
-
Idempotency: Ingestion should be re-runnable. Use fixed point IDs (e.g., hash of document ID) so re-running inserts the same data without duplicates.
-
Atomicity: For critical data, wrap multiple upserts in a transaction or checkpoint. Example:
# Ingest in checkpoints
checkpoint_size = 10_000
for i in range(0, len(all_points), checkpoint_size):
batch = all_points[i:i+checkpoint_size]
client.upsert(collection_name="documents", points=batch)
# Log checkpoint
logger.info(f"Ingested {i+checkpoint_size} points")
# On failure, retry from last checkpoint
- Validation: After ingestion, verify counts and sample searches:
# Count points
count_result = client.count(collection_name="documents")
expected_count = len(all_points)
assert count_result.count == expected_count, f"Count mismatch: {count_result.count} vs {expected_count}"
# Sample search to verify vectors are indexed
results = client.search(
collection_name="documents",
query_vector=all_embeddings[0],
limit=10
)
assert len(results) > 0, "Search returned no results!"
Key Takeaways
- Parallelized bulk ingestion with 8–16 workers achieves 500K–2M vectors/second.
- Disable index updates during bulk load and rebuild once for 2–5x throughput gain.
- Batch real-time upserts (1000 points per batch, flush every 5 seconds) to maintain 10K–50K vec/s throughput.
- Use soft deletes (payload flags) for reversibility; hard deletes for space-critical systems.
- Validate counts and sample searches after ingestion to catch consistency errors early.
Frequently Asked Questions
How long should bulk ingestion take for 100M vectors?
With optimized parallel ingestion (8 workers, batch size 10K, disabled indexing), 100M vectors should complete in 30–90 minutes. With single-threaded ingestion, expect 5–15 hours. If you are seeing slower performance, check network latency, disk I/O, and CPU utilization.
Can I upsert while queries are running?
Yes. Most vector databases support concurrent upserts and queries. However, upserts during heavy query load may increase query latency. Schedule bulk upserts during low-traffic windows if possible.
What happens if an upsert fails mid-batch?
The behavior depends on your database. Some databases roll back the entire batch (atomic); others partially commit. Check your vector database's transaction semantics. Use checkpoints and idempotent IDs to handle partial failures safely.
Should I hard delete or soft delete sensitive data?
For GDPR/privacy compliance, hard delete is safer: the data is completely removed and cannot be recovered. Soft delete leaves data on disk and risks accidental exposure. Use hard delete for PII and sensitive data.