similarity-search-patterns

similarity-search-patterns

熱門

Implement efficient similarity search with vector databases. Use when building semantic search, implementing nearest neighbor queries, or optimizing retrieval performance.

26K星標
2.9K分支
更新於 1/22/2026
SKILL.md
readonlyread-only
name
similarity-search-patterns
description

Implement efficient similarity search with vector databases. Use when building semantic search, implementing nearest neighbor queries, or optimizing retrieval performance.

Similarity Search Patterns

Patterns for implementing efficient similarity search in production systems.

When to Use This Skill

  • Building semantic search systems
  • Implementing RAG retrieval
  • Creating recommendation engines
  • Optimizing search latency
  • Scaling to millions of vectors
  • Combining semantic and keyword search

Core Concepts

1. Distance Metrics

| Metric | Formula | Best For |
| ------------------ | ------------------ | --------------------- | --- | -------------- |
| Cosine | 1 - (A·B)/(‖A‖‖B‖) | Normalized embeddings |
| Euclidean (L2) | √Σ(a-b)² | Raw embeddings |
| Dot Product | A·B | Magnitude matters |
| Manhattan (L1) | Σ | a-b | | Sparse vectors |

2. Index Types

┌─────────────────────────────────────────────────┐
│                 Index Types                      │
├─────────────┬───────────────┬───────────────────┤
│    Flat     │     HNSW      │    IVF+PQ         │
│ (Exact)     │ (Graph-based) │ (Quantized)       │
├─────────────┼───────────────┼───────────────────┤
│ O(n) search │ O(log n)      │ O(√n)             │
│ 100% recall │ ~95-99%       │ ~90-95%           │
│ Small data  │ Medium-Large  │ Very Large        │
└─────────────┴───────────────┴───────────────────┘

Templates

Template 1: Pinecone Implementation

from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional
import hashlib

class PineconeVectorStore:
    def __init__(
        self,
        api_key: str,
        index_name: str,
        dimension: int = 1536,
        metric: str = "cosine"
    ):
        self.pc = Pinecone(api_key=api_key)

        # Create index if not exists
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )

        self.index = self.pc.Index(index_name)

    def upsert(
        self,
        vectors: List[Dict],
        namespace: str = ""
    ) -> int:
        """
        Upsert vectors.
        vectors: [{"id": str, "values": List[float], "metadata": dict}]
        """
        # Batch upsert
        batch_size = 100
        total = 0

        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch, namespace=namespace)
            total += len(batch)

        return total

    def search(
        self,
        query_vector: List[float],
        top_k: int = 10,
        namespace: str = "",
        filter: Optional[Dict] = None,
        include_metadata: bool = True
    ) -> List[Dict]:
        """Search for similar vectors."""
        results = self.index.query(
            vector=query_vector,
            top_k=top_k,
            namespace=namespace,
            filter=filter,
            include_metadata=include_metadata
        )

        return [
            {
                "id": match.id,
                "score": match.score,
                "metadata": match.metadata
            }
            for match in results.matches
        ]

    def search_with_rerank(
        self,
        query: str,
        query_vector: List[float],
        top_k: int = 10,
        rerank_top_n: int = 50,
        namespace: str = ""
    ) -> List[Dict]:
        """Search and rerank results."""
        # Over-fetch for reranking
        initial_results = self.search(
            query_vector,
            top_k=rerank_top_n,
            namespace=namespace
        )

        # Rerank with cross-encoder or LLM
        reranked = self._rerank(query, initial_results)

        return reranked[:top_k]

    def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
        """Rerank results using cross-encoder."""
        from sentence_transformers import CrossEncoder

        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

        pairs = [(query, r["metadata"]["text"]) for r in results]
        scores = model.predict(pairs)

        for result, score in zip(results, scores):
            result["rerank_score"] = float(score)

        return sorted(results, key=lambda x: x["rerank_score"], reverse=True)

    def delete(self, ids: List[str], namespace: str = ""):
        """Delete vectors by ID."""
        self.index.delete(ids=ids, namespace=namespace)

    def delete_by_filter(self, filter: Dict, namespace: str = ""):
        """Delete vectors matching filter."""
        self.index.delete(filter=filter, namespace=namespace)

Template 2: Qdrant Implementation

from qdrant_client import QdrantClient
from qdrant_client.http import models
from typing import List, Dict, Optional

class QdrantVectorStore:
    def __init__(
        self,
        url: str = "localhost",
        port: int = 6333,
        collection_name: str = "documents",
        vector_size: int = 1536
    ):
        self.client = QdrantClient(url=url, port=port)
        self.collection_name = collection_name

        # Create collection if not exists
        collections = self.client.get_collections().collections
        if collection_name not in [c.name for c in collections]:
            self.client.create_collection(
                collection_name=collection_name,
                vectors_config=models.VectorParams(
                    size=vector_size,
                    distance=models.Distance.COSINE
                ),
                # Optional: enable quantization for memory efficiency
                quantization_config=models.ScalarQuantization(
                    scalar=models.ScalarQuantizationConfig(
                        type=models.ScalarType.INT8,
                        quantile=0.99,
                        always_ram=True
                    )
                )
            )

    def upsert(self, points: List[Dict]) -> int:
        """
        Upsert points.
        points: [{"id": str/int, "vector": List[float], "payload": dict}]
        """
        qdrant_points = [
            models.PointStruct(
                id=p["id"],
                vector=p["vector"],
                payload=p.get("payload", {})
            )
            for p in points
        ]

        self.client.upsert(
            collection_name=self.collection_name,
            points=qdrant_points
        )
        return len(points)

    def search(
        self,
        query_vector: List[float],
        limit: int = 10,
        filter: Optional[models.Filter] = None,
        score_threshold: Optional[float] = None
    ) -> List[Dict]:
        """Search for similar vectors."""
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            limit=limit,
            query_filter=filter,
            score_threshold=score_threshold
        )

        return [
            {
                "id": r.id,
                "score": r.score,
                "payload": r.payload
            }
            for r in results
        ]

    def search_with_filter(
        self,
        query_vector: List[float],
        must_conditions: List[Dict] = None,
        should_conditions: List[Dict] = None,
        must_not_conditions: List[Dict] = None,
        limit: int = 10
    ) -> List[Dict]:
        """Search with complex filters."""
        conditions = []

        if must_conditions:
            conditions.extend([
                models.FieldCondition(
                    key=c["key"],
                    match=models.MatchValue(value=c["value"])
                )
                for c in must_conditions
            ])

        filter = models.Filter(must=conditions) if conditions else None

        return self.search(query_vector, limit=limit, filter=filter)

    def search_with_sparse(
        self,
        dense_vector: List[float],
        sparse_vector: Dict[int, float],
        limit: int = 10,
        dense_weight: float = 0.7
    ) -> List[Dict]:
        """Hybrid search with dense and sparse vectors."""
        # Requires collection with named vectors
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=models.NamedVector(
                name="dense",
                vector=dense_vector
            ),
            limit=limit
        )
        return [{"id": r.id, "score": r.score, "payload": r.payload} for r in results]

Template 3: pgvector with PostgreSQL

import asyncpg
from typing import List, Dict, Optional
import numpy as np

class PgVectorStore:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string

    async def init(self):
        """Initialize connection pool and extension."""
        self.pool = await asyncpg.create_pool(self.connection_string)

        async with self.pool.acquire() as conn:
            # Enable extension
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

            # Create table
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS documents (
                    id TEXT PRIMARY KEY,
                    content TEXT,
                    metadata JSONB,
                    embedding vector(1536)
                )
            """)

            # Create index (HNSW for better performance)
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS documents_embedding_idx
                ON documents
                USING hnsw (embedding vector_cosine_ops)
                WITH (m = 16, ef_construction = 64)
            """)

    async def upsert(self, documents: List[Dict]):
        """Upsert documents with embeddings."""
        async with self.pool.acquire() as conn:
            await conn.executemany(
                """
                INSERT INTO documents (id, content, metadata, embedding)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (id) DO UPDATE SET
                    content = EXCLUDED.content,
                    metadata = EXCLUDED.metadata,
                    embedding = EXCLUDED.embedding
                """,
                [
                    (
                        doc["id"],
                        doc["content"],
                        doc.get("metadata", {}),
                        np.array(doc["embedding"]).tolist()
                    )
                    for doc in documents
                ]
            )

    async def search(
        self,
        query_embedding: List[float],
        limit: int = 10,
        filter_metadata: Optional[Dict] = None
    ) -> List[Dict]:
        """Search for similar documents."""
        query = """
            SELECT id, content, metadata,
                   1 - (embedding <=> $1::vector) as similarity
            FROM documents
        """

        params = [query_embedding]

        if filter_metadata:
            conditions = []
            for key, value in filter_metadata.items():
                params.append(value)
                conditions.append(f"metadata->>'{key}' = ${len(params)}")
            query += " WHERE " + " AND ".join(conditions)

        query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
        params.append(limit)

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)

        return [
            {
                "id": row["id"],
                "content": row["content"],
                "metadata": row["metadata"],
                "score": row["similarity"]
            }
            for row in rows
        ]

    async def hybrid_search(
        self,
        query_embedding: List[float],
        query_text: str,
        limit: int = 10,
        vector_weight: float = 0.5
    ) -> List[Dict]:
        """Hybrid search combining vector and full-text."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                WITH vector_results AS (
                    SELECT id, content, metadata,
                           1 - (embedding <=> $1::vector) as vector_score
                    FROM documents
                    ORDER BY embedding <=> $1::vector
                    LIMIT $3 * 2
                ),
                text_results AS (
                    SELECT id, content, metadata,
                           ts_rank(to_tsvector('english', content),
                                   plainto_tsquery('english', $2)) as text_score
                    FROM documents
                    WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
                    LIMIT $3 * 2
                )
                SELECT
                    COALESCE(v.id, t.id) as id,
                    COALESCE(v.content, t.content) as content,
                    COALESCE(v.metadata, t.metadata) as metadata,
                    COALESCE(v.vector_score, 0) * $4 +
                    COALESCE(t.text_score, 0) * (1 - $4) as combined_score
                FROM vector_results v
                FULL OUTER JOIN text_results t ON v.id = t.id
                ORDER BY combined_score DESC
                LIMIT $3
                """,
                query_embedding, query_text, limit, vector_weight
            )

        return [dict(row) for row in rows]

Template 4: Weaviate Implementation

import weaviate
from weaviate.util import generate_uuid5
from typing import List, Dict, Optional

class WeaviateVectorStore:
    def __init__(
        self,
        url: str = "http://localhost:8080",
        class_name: str = "Document"
    ):
        self.client = weaviate.Client(url=url)
        self.class_name = class_name
        self._ensure_schema()

    def _ensure_schema(self):
        """Create schema if not exists."""
        schema = {
            "class": self.class_name,
            "vectorizer": "none",  # We provide vectors
            "properties": [
                {"name": "content", "dataType": ["text"]},
                {"name": "source", "dataType": ["string"]},
                {"name": "chunk_id", "dataType": ["int"]}
            ]
        }

        if not self.client.schema.exists(self.class_name):
            self.client.schema.create_class(schema)

    def upsert(self, documents: List[Dict]):
        """Batch upsert documents."""
        with self.client.batch as batch:
            batch.batch_size = 100

            for doc in documents:
                batch.add_data_object(
                    data_object={
                        "content": doc["content"],
                        "source": doc.get("source", ""),
                        "chunk_id": doc.get("chunk_id", 0)
                    },
                    class_name=self.class_name,
                    uuid=generate_uuid5(doc["id"]),
                    vector=doc["embedding"]
                )

    def search(
        self,
        query_vector: List[float],
        limit: int = 10,
        where_filter: Optional[Dict] = None
    ) -> List[Dict]:
        """Vector search."""
        query = (
            self.client.query
            .get(self.class_name, ["content", "source", "chunk_id"])
            .with_near_vector({"vector": query_vector})
            .with_limit(limit)
            .with_additional(["distance", "id"])
        )

        if where_filter:
            query = query.with_where(where_filter)

        results = query.do()

        return [
            {
                "id": item["_additional"]["id"],
                "content": item["content"],
                "source": item["source"],
                "score": 1 - item["_additional"]["distance"]
            }
            for item in results["data"]["Get"][self.class_name]
        ]

    def hybrid_search(
        self,
        query: str,
        query_vector: List[float],
        limit: int = 10,
        alpha: float = 0.5  # 0 = keyword, 1 = vector
    ) -> List[Dict]:
        """Hybrid search combining BM25 and vector."""
        results = (
            self.client.query
            .get(self.class_name, ["content", "source"])
            .with_hybrid(query=query, vector=query_vector, alpha=alpha)
            .with_limit(limit)
            .with_additional(["score"])
            .do()
        )

        return [
            {
                "content": item["content"],
                "source": item["source"],
                "score": item["_additional"]["score"]
            }
            for item in results["data"]["Get"][self.class_name]
        ]

Best Practices

Do's

  • Use appropriate index - HNSW for most cases
  • Tune parameters - ef_search, nprobe for recall/speed
  • Implement hybrid search - Combine with keyword search
  • Monitor recall - Measure search quality
  • Pre-filter when possible - Reduce search space

Don'ts

  • Don't skip evaluation - Measure before optimizing
  • Don't over-index - Start with flat, scale up
  • Don't ignore latency - P99 matters for UX
  • Don't forget costs - Vector storage adds up

Resources

You Might Also Like

Related Skills

zig-system-calls

zig-system-calls

87Kdev-database

Guides using bun.sys for system calls and file I/O in Zig. Use when implementing file operations instead of std.fs or std.posix.

oven-sh avataroven-sh
獲取
bun-file-io

bun-file-io

86Kdev-database

Use this when you are working on file operations like reading, writing, scanning, or deleting files. It summarizes the preferred file APIs and patterns used in this repo. It also notes when to use filesystem helpers for directories.

anomalyco avataranomalyco
獲取
vector-index-tuning

vector-index-tuning

26Kdev-database

Optimize vector index performance for latency, recall, and memory. Use when tuning HNSW parameters, selecting quantization strategies, or scaling vector search infrastructure.

wshobson avatarwshobson
獲取

Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.

wshobson avatarwshobson
獲取
event-store-design

event-store-design

26Kdev-database

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.

wshobson avatarwshobson
獲取
projection-patterns

projection-patterns

26Kdev-database

Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.

wshobson avatarwshobson
獲取