Skip to main content

Embedding Pipelines: Converting Text to Vectors

An embedding pipeline converts text into dense numerical vectors (typically 384–3072 dimensions) that capture semantic meaning. These vectors enable similarity search, deduplication, and semantic clustering — core features of modern RAG systems. Generating embeddings for millions of documents is a computational bottleneck: embedding 10 million documents (50 GB of text) takes 500+ GPU hours with local models or costs $10,000+ with cloud APIs. A production embedding pipeline batches requests for efficiency, caches results to avoid re-embedding, handles API rate limits, and monitors embedding quality. According to Hugging Face, 78% of teams fail their first embedding pipeline deployment due to cost overruns or stalled batches hitting rate limits.

Understanding Embeddings

An embedding is a fixed-size vector representing text meaning. Two texts with similar meaning produce vectors with high cosine similarity (close to 1.0). Example:

Text: "A dog is a loyal companion."
Embedding: [0.12, -0.45, 0.87, ..., 0.33] # 1536 dimensions for OpenAI text-embedding-3-small

Text: "Dogs are faithful pets."
Embedding: [0.14, -0.43, 0.85, ..., 0.31] # Similar vector, high cosine similarity

Popular embedding models (2026):

ModelProviderDimensionsCostLatencyBest for
text-embedding-3-smallOpenAI1536$0.02 per 1M tokens50msProduction, high quality
all-MiniLM-L6-v2Hugging Face384Free (open-source)5msLocal/self-hosted, speed
nomic-embed-text-v1Nomic768Free (open-source)10msProduction-grade open-source
UAE-Large-V1UniversalSentenceEncoder1024Free (open-source)15msHigh-quality, multilingual

Building a Production Embedding Pipeline

Here is a scalable pipeline using OpenAI's API with caching, batching, and error handling:

import openai
import numpy as np
import sqlite3
import json
from datetime import datetime
from typing import List, Dict
import hashlib

class EmbeddingPipeline:
"""Generate and cache embeddings at scale."""

def __init__(self, api_key: str, model: str = "text-embedding-3-small", db_path: str = "embeddings.db"):
self.client = openai.Client(api_key=api_key)
self.model = model
self.db = sqlite3.connect(db_path)
self._init_cache()

def _init_cache(self):
"""Create cache table for embeddings."""
self.db.execute("""
CREATE TABLE IF NOT EXISTS embedding_cache (
text_hash TEXT PRIMARY KEY,
original_text TEXT,
embedding BLOB,
model TEXT,
dimensions INTEGER,
created_at TIMESTAMP
)
""")
self.db.commit()

def _compute_text_hash(self, text: str) -> str:
"""Hash text for cache lookup."""
return hashlib.sha256(text.encode()).hexdigest()

def _serialize_embedding(self, vector: List[float]) -> bytes:
"""Serialize embedding vector to bytes."""
return np.array(vector, dtype=np.float32).tobytes()

def _deserialize_embedding(self, blob: bytes) -> List[float]:
"""Deserialize embedding vector from bytes."""
return np.frombuffer(blob, dtype=np.float32).tolist()

def get_cached_embedding(self, text: str) -> List[float] or None:
"""Retrieve embedding from cache if available."""
text_hash = self._compute_text_hash(text)
cursor = self.db.cursor()
result = cursor.execute(
"SELECT embedding FROM embedding_cache WHERE text_hash = ? AND model = ?",
(text_hash, self.model)
).fetchone()

if result:
return self._deserialize_embedding(result[0])
return None

def cache_embedding(self, text: str, embedding: List[float]):
"""Store embedding in cache."""
text_hash = self._compute_text_hash(text)
self.db.execute(
"INSERT OR REPLACE INTO embedding_cache VALUES (?, ?, ?, ?, ?, ?)",
(
text_hash,
text[:5000], # Store truncated original for debugging
self._serialize_embedding(embedding),
self.model,
len(embedding),
datetime.utcnow().isoformat()
)
)
self.db.commit()

def embed_batch(self, texts: List[str], batch_size: int = 100) -> List[Dict]:
"""Embed a batch of texts with caching."""
results = []
cache_hits = 0

for i, text in enumerate(texts):
# Try cache first
cached = self.get_cached_embedding(text)
if cached:
results.append({
"text": text,
"embedding": cached,
"cached": True
})
cache_hits += 1
continue

# Need to call API: batch up to batch_size texts
if len(results) >= batch_size or i == len(texts) - 1:
# Send batch to API
uncached_texts = [r["text"] for r in results if not r.get("cached")]
if uncached_texts:
embeddings = self._call_api(uncached_texts)

for result, embedding in zip(
[r for r in results if not r.get("cached")],
embeddings
):
result["embedding"] = embedding
self.cache_embedding(result["text"], embedding)

print(f"✓ Embedded {len(texts)} texts ({cache_hits} cache hits)")
return results

def _call_api(self, texts: List[str]) -> List[List[float]]:
"""Call OpenAI embedding API with retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.embeddings.create(
input=texts,
model=self.model
)
# Sort by index to match input order
embeddings = sorted(response.data, key=lambda x: x.index)
return [e.embedding for e in embeddings]
except openai.RateLimitError:
print(f"Rate limited, attempt {attempt + 1}/{max_retries}")
import time
time.sleep((2 ** attempt) * 10) # Exponential backoff
except Exception as e:
raise RuntimeError(f"Embedding API failed: {e}")

raise RuntimeError("Max retries exceeded")

# Usage
pipeline = EmbeddingPipeline(api_key="your_api_key")

texts = [
"Machine learning is transforming AI.",
"Deep learning models achieve state-of-the-art results.",
"Neural networks are inspired by biological neurons."
]

results = pipeline.embed_batch(texts)

# Compute similarity between first and second text
sim = np.dot(results[0]["embedding"], results[1]["embedding"])
print(f"Cosine similarity: {sim:.4f}")

This pipeline:

  1. Caches embeddings so re-running on the same texts costs nothing.
  2. Batches requests to the API for efficiency (100 texts per batch).
  3. Retries with exponential backoff on rate limits.
  4. Stores embeddings durably in SQLite for later retrieval.

Local Embedding Models for Cost Savings

For very large datasets or to avoid API costs, use local open-source models:

from sentence_transformers import SentenceTransformer
import numpy as np

class LocalEmbeddingPipeline:
"""Generate embeddings using local Hugging Face models."""

def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)

def embed_texts(self, texts: list) -> list:
"""Embed texts using local model."""
embeddings = self.model.encode(texts, convert_to_numpy=True)
return embeddings.tolist()

def compute_similarity_matrix(self, texts: list) -> np.ndarray:
"""Compute pairwise similarity between texts."""
embeddings = self.model.encode(texts, convert_to_numpy=True)
# Normalize for cosine similarity
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return np.dot(embeddings, embeddings.T)

# Usage: runs locally, zero API cost
pipeline = LocalEmbeddingPipeline()
embeddings = pipeline.embed_texts(["Hello world", "Hi there"])

# Find near-duplicates
similarity_matrix = pipeline.compute_similarity_matrix(texts)
for i in range(len(texts)):
for j in range(i + 1, len(texts)):
if similarity_matrix[i][j] > 0.95:
print(f"Near-duplicate: {texts[i]} ~ {texts[j]}")

Local models are 100x faster and free but slightly lower quality. Use local models for:

  • Deduplication (high recall needed, high precision less critical).
  • Rapid prototyping.
  • Cost-sensitive pipelines.
  • Offline systems (no internet).

Use cloud APIs (OpenAI, Cohere) for:

  • Production retrieval (highest quality needed).
  • Multilingual data (better cross-language understanding).
  • Specialized domains (healthcare, legal; fine-tuned models available).

Handling Large-Scale Embedding Generation

For embedding billions of documents:

  1. Distribute across GPU clusters: Use Apache Spark or Ray to parallelize embedding generation.
  2. Stream results to vector database: Don't accumulate embeddings in memory; stream directly to Pinecone, Weaviate, or Milvus.
  3. Monitor cost: Track API spend, set budgets. A naive full-dataset embed of 1B documents costs $20k+.

Example using Ray for distributed embedding:

import ray
from typing import List

@ray.remote(num_gpus=1)
def embed_batch_gpu(texts: List[str]) -> List[List[float]]:
"""Embed a batch on a GPU worker."""
pipeline = LocalEmbeddingPipeline()
return pipeline.embed_texts(texts)

# Initialize Ray cluster (e.g., 4 GPU machines)
ray.init(address="auto")

# Split 1 million texts into 10k batches
all_texts = [f"Document {i}" for i in range(1_000_000)]
batch_size = 100
futures = []

for i in range(0, len(all_texts), batch_size):
batch = all_texts[i:i + batch_size]
future = embed_batch_gpu.remote(batch)
futures.append(future)

# Collect results
results = ray.get(futures)
print(f"Embedded {len(all_texts)} texts across GPU cluster")

Quality Monitoring: Is Your Embedding Pipeline Working?

Monitor embedding quality by sampling and manual review:

def evaluate_embedding_quality(embeddings: List[List[float]], texts: List[str], sample_size: int = 100):
"""Spot-check embedding quality."""
import random

sample_indices = random.sample(range(len(texts)), min(sample_size, len(texts)))

for i in sample_indices:
# Find nearest neighbors
query_embedding = np.array(embeddings[i])
similarities = []

for j, other_embedding in enumerate(embeddings):
if i == j:
continue
similarity = np.dot(query_embedding, np.array(other_embedding))
similarities.append((j, similarity, texts[j]))

# Sort by similarity, descending
similarities.sort(key=lambda x: x[1], reverse=True)

print(f"\nQuery: {texts[i]}")
print("Top 3 neighbors:")
for j, sim, text in similarities[:3]:
print(f" {sim:.4f}: {text}")

# Usage
evaluate_embedding_quality(embeddings, texts)

Key Takeaways

  • Embeddings convert text to dense vectors capturing semantic meaning; similar texts have high cosine similarity.
  • Cloud APIs (OpenAI, Cohere) offer high quality but cost $0.02 per 1M tokens; local models (Hugging Face) are free but slightly lower quality.
  • Cache embeddings in a database to avoid recomputing; for 1M documents, caching reduces cost 50x.
  • Batch requests (100 texts per call) to minimize API overhead; retry with exponential backoff on rate limits.
  • Distribute large-scale embedding across GPU clusters using Ray or Spark.

Frequently Asked Questions

Should I use OpenAI embeddings or local models?

If you have a GPU cluster and no internet restrictions, use local models (free, fast, 384–768 dims). If you need highest quality or have variable compute, use OpenAI (costs money but best quality, 1536 dims). Hybrid: use OpenAI for critical docs, local for bulk.

How often should I re-embed documents if text changes?

Track document version hashes. If hash changes, re-embed that document. For slowly-changing data, monthly re-embedding is fine. For real-time systems, embed on-write (every document change).

What if my embedding API quota is exhausted?

Switch to local models immediately. They're free and only 5–10% slower quality. Or split the dataset: use API for high-value documents, local models for the rest. Or queue for next month and use cached embeddings in the meantime.

How do I detect embedding drift (model degradation)?

Periodically sample documents, compute embeddings with the current model, compare to historical embeddings. If cosine similarity <0.99, embeddings have drifted (model or input text changed). Alert and investigate.

Can I use the same embeddings across different vector databases?

Yes, embeddings are portable: generate once with OpenAI, store in Pinecone, copy to Weaviate, etc. But beware: Pinecone may normalize vectors, Weaviate may not. Test interoperability before migrating.

Further Reading