next-plaid-client

Python SDK for the NextPlaid ColBERT Search API. Provides synchronous and asynchronous clients for interacting with the next-plaid-api server.

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                           next-plaid-client                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │   NextPlaidClient       │         │   AsyncNextPlaidClient          │    │
│  │     (Synchronous)       │         │     (Asynchronous)              │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │                         │         │                                 │    │
│  │  httpx.Client           │         │  httpx.AsyncClient              │    │
│  │         ↓               │         │         ↓                       │    │
│  │  Blocking I/O           │         │  asyncio I/O                    │    │
│  │                         │         │                                 │    │
│  └───────────┬─────────────┘         └───────────┬─────────────────────┘    │
│              │                                   │                          │
│              └───────────────┬───────────────────┘                          │
│                              ▼                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                     BaseNextPlaidClient                               │  │
│  │                                                                       │  │
│  │  - URL construction          - Payload preparation                    │  │
│  │  - Response parsing          - Error handling                         │  │
│  │  - Input type detection      - Exception mapping                      │  │
│  │                                                                       │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │       Models            │         │       Exceptions                │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │  IndexConfig            │         │  NextPlaidError (base)          │    │
│  │  IndexInfo              │         │  IndexNotFoundError             │    │
│  │  SearchParams           │         │  IndexExistsError               │    │
│  │  SearchResult           │         │  ValidationError                │    │
│  │  QueryResult            │         │  RateLimitError                 │    │
│  │  HealthResponse         │         │  ModelNotLoadedError            │    │
│  │  RerankResponse         │         │  ConnectionError                │    │
│  │  MetadataResponse       │         │  ServerError                    │    │
│  └─────────────────────────┘         └─────────────────────────────────┘    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Features


Installation

# From PyPI
pip install next-plaid-client

# From source
pip install git+https://github.com/lightonai/next-plaid.git#subdirectory=next-plaid-api/python-sdk

# For development
pip install -e "next-plaid-api/python-sdk[dev]"

Requirements


Quick Start

from next_plaid_client import NextPlaidClient, IndexConfig, SearchParams

# Connect to the API
client = NextPlaidClient("http://localhost:8080")

# Check server health
health = client.health()
print(f"Server status: {health.status}")

# Create an index
client.create_index("my_index", IndexConfig(nbits=4))

# Add documents (text - requires model on server)
client.add(
    "my_index",
    ["Paris is the capital of France.", "Berlin is in Germany."],
    metadata=[{"country": "France"}, {"country": "Germany"}]
)

# Search with text queries
results = client.search("my_index", ["What is the capital of France?"])

# Print results
for result in results.results:
    for doc_id, score, meta in zip(result.document_ids, result.scores, result.metadata or []):
        print(f"Document {doc_id}: {score:.4f} - {meta}")

Client Initialization

Synchronous Client

from next_plaid_client import NextPlaidClient

client = NextPlaidClient(
    base_url="http://localhost:8080",  # API server URL
    timeout=30.0,                       # Request timeout in seconds
    headers={"Authorization": "..."}    # Optional headers
)

# Context manager usage (auto-closes connection)
with NextPlaidClient("http://localhost:8080") as client:
    health = client.health()

Async Client

import asyncio
from next_plaid_client import AsyncNextPlaidClient

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        health = await client.health()
        print(f"Server status: {health.status}")

asyncio.run(main())

API Reference

Health Check

health = client.health()

Returns: HealthResponse

FieldTypeDescription
statusstrServer status ("healthy")
versionstrAPI version
loaded_indicesintNumber of loaded indices
index_dirstrIndex storage directory
memory_usage_bytesintMemory usage
indicesList[IndexSummary]Summary of each index

Index Management

List Indices

indices: List[str] = client.list_indices()

Get Index Info

info: IndexInfo = client.get_index("my_index")

Returns: IndexInfo

FieldTypeDescription
namestrIndex name
num_documentsintDocument count
num_embeddingsintTotal embeddings
num_partitionsintIVF partitions
avg_doclenfloatAverage tokens per doc
dimensionintEmbedding dimension
has_metadataboolHas metadata DB
metadata_countOptional[int]Metadata entry count
max_documentsOptional[int]Document limit

Create Index

client.create_index("my_index", IndexConfig(
    nbits=4,                    # Quantization bits (2 or 4)
    batch_size=50000,           # Documents per chunk
    seed=42,                    # Random seed
    start_from_scratch=999,     # Rebuild threshold
    max_documents=10000         # Max documents (None = unlimited)
))

Parameters:

ParameterTypeDefaultDescription
nbitsint4Quantization bits (2 or 4)
batch_sizeint50000Documents per chunk
seedOptional[int]NoneRandom seed
start_from_scratchint999Rebuild threshold
max_documentsOptional[int]NoneMax documents

Update Index Config

client.update_index_config("my_index", max_documents=5000)

Delete Index

client.delete_index("my_index")

Document Operations

Add Documents

The add() method automatically detects input type (text vs embeddings).

# Text documents (requires model on server)
client.add(
    "my_index",
    ["Document 1 text", "Document 2 text"],
    metadata=[{"category": "science"}, {"category": "history"}]
)

# With token pooling (reduces embeddings by 2x)
client.add(
    "my_index",
    ["Long document text..."],
    pool_factor=2
)

# Pre-computed embeddings
client.add(
    "my_index",
    [{"embeddings": [[0.1, 0.2], [0.3, 0.4]]}],  # [num_tokens, dim]
    metadata=[{"title": "Doc 1"}]
)

Parameters:

ParameterTypeDescription
index_namestrTarget index name
documentsUnion[List[str], List[Dict]]Text or embeddings
metadataOptional[List[Dict]]Metadata per document
pool_factorOptional[int]Token reduction factor
Returns: str (status message, async 202)

Delete Documents

client.delete(
    "my_index",
    condition="category = ? AND year < ?",
    parameters=["outdated", 2020]
)

Parameters:

ParameterTypeDescription
index_namestrTarget index
conditionstrSQL WHERE clause
parametersOptional[List[Any]]Query parameters

Search Operations

The search() method automatically detects query type (text vs embeddings).

Text Search (requires model)

results = client.search(
    "my_index",
    ["What is machine learning?", "Neural networks"],
    params=SearchParams(top_k=10)
)

Embedding Search

results = client.search(
    "my_index",
    [[[0.1, 0.2], [0.3, 0.4]]],  # [batch, num_tokens, dim]
    params=SearchParams(top_k=10)
)

Filtered Search

results = client.search(
    "my_index",
    ["machine learning"],
    filter_condition="category = ? AND year > ?",
    filter_parameters=["science", 2020]
)

Subset Search

results = client.search(
    "my_index",
    ["query"],
    subset=[0, 5, 10, 15]  # Only search these document IDs
)

Search Parameters:

SearchParams(
    top_k=10,                      # Results per query (default: 10)
    n_ivf_probe=8,                 # IVF cells to probe (default: 8)
    n_full_scores=4096,            # Re-ranking candidates (default: 4096)
    centroid_score_threshold=0.4   # Pruning threshold (default: 0.4, set to None to disable)
)

ParameterTypeDefaultDescription
top_kint10Results per query
n_ivf_probeint8IVF cells to probe
n_full_scoresint4096Candidates for exact scoring
centroid_score_thresholdOptional[float]0.4Centroid pruning threshold (set to None to disable)
Returns: SearchResult
@dataclass
class SearchResult:
    results: List[QueryResult]  # One per query
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict]]]

Metadata Operations

Get All Metadata

response: MetadataResponse = client.get_metadata("my_index")
# response.metadata: List[Dict]
# response.count: int

Get Metadata Count

result = client.get_metadata_count("my_index")
# result["count"]: int
# result["has_metadata"]: bool

Query Metadata

result = client.query_metadata(
    "my_index",
    condition="category = ? AND score > ?",
    parameters=["science", 0.5]
)
# result["document_ids"]: List[int]
# result["count"]: int

Get Metadata by IDs

response = client.get_metadata_by_ids(
    "my_index",
    document_ids=[0, 5, 10],
    limit=100
)

Check Document Existence

result: MetadataCheckResponse = client.check_metadata(
    "my_index",
    document_ids=[0, 1, 2, 999]
)
# result.existing_ids: List[int]
# result.missing_ids: List[int]
# result.existing_count: int
# result.missing_count: int

Text Encoding

Encode texts to ColBERT embeddings (requires model on server).

response: EncodeResponse = client.encode(
    texts=["Hello world", "Test document"],
    input_type="document",  # or "query"
    pool_factor=2           # Optional token reduction
)
# response.embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
# response.num_texts: int

Parameters:

ParameterTypeDefaultDescription
textsList[str]requiredTexts to encode
input_typestr"document""document" or "query"
pool_factorOptional[int]NoneToken reduction factor

Reranking

Reorder documents by relevance using ColBERT's MaxSim scoring.

Text Reranking (requires model)

result = client.rerank(
    query="What is the capital of France?",
    documents=[
        "Berlin is the capital of Germany.",
        "Paris is the capital of France.",
        "Tokyo is the largest city in Japan.",
    ]
)

# Results sorted by score (descending)
for r in result.results:
    print(f"Document {r.index}: {r.score:.4f}")
# Document 1: 15.2341  (Paris - most relevant)
# Document 0: 8.1234   (Berlin - somewhat relevant)
# Document 2: 3.4567   (Tokyo - least relevant)

Embedding Reranking

result = client.rerank(
    query=[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],  # [num_tokens, dim]
    documents=[
        {"embeddings": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]},
        {"embeddings": [[0.7, 0.8, 0.9], [0.1, 0.2, 0.3]]},
    ]
)

Returns: RerankResponse

@dataclass
class RerankResponse:
    results: List[RerankResult]  # Sorted by score descending
    num_documents: int

@dataclass
class RerankResult:
    index: int    # Original document index
    score: float  # MaxSim score

Exception Handling

All exceptions inherit from NextPlaidError:

from next_plaid_client import (
    NextPlaidError,
    IndexNotFoundError,
    IndexExistsError,
    ValidationError,
    RateLimitError,
    ModelNotLoadedError,
    ConnectionError,
    ServerError,
)

try:
    client.get_index("nonexistent")
except IndexNotFoundError as e:
    print(f"Index not found: {e.message}")
    print(f"Error code: {e.code}")
    print(f"HTTP status: {e.status_code}")
except RateLimitError as e:
    print(f"Rate limited: {e.message}")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except ModelNotLoadedError as e:
    print(f"Model required: {e.message}")
except NextPlaidError as e:
    print(f"API error: {e.message} (code: {e.code})")

Exception Hierarchy

ExceptionHTTP StatusDescription
NextPlaidError-Base exception
IndexNotFoundError404Index does not exist
IndexExistsError409Index already exists
ValidationError400Invalid request parameters
RateLimitError429Rate limit exceeded
ModelNotLoadedError503Encoding requires model
ConnectionError-Connection failed
ServerError5xxServer error

Exception Attributes

AttributeTypeDescription
messagestrHuman-readable error message
codeOptional[str]Error code (e.g., INDEX_NOT_FOUND)
detailsOptional[Any]Additional error details
status_codeOptional[int]HTTP status code

Data Models

IndexConfig

@dataclass
class IndexConfig:
    nbits: int = 4                       # Quantization bits
    batch_size: int = 50000              # Documents per chunk
    seed: Optional[int] = None           # Random seed
    start_from_scratch: int = 999        # Rebuild threshold
    max_documents: Optional[int] = None  # Max documents

IndexInfo

@dataclass
class IndexInfo:
    name: str
    num_documents: int
    num_embeddings: int
    num_partitions: int
    avg_doclen: float
    dimension: int
    has_metadata: bool
    metadata_count: Optional[int] = None
    max_documents: Optional[int] = None

SearchParams

@dataclass
class SearchParams:
    top_k: int = 10
    n_ivf_probe: int = 8
    n_full_scores: int = 4096
    centroid_score_threshold: Optional[float] = 0.4  # Default: 0.4, set to None to disable

SearchResult / QueryResult

@dataclass
class SearchResult:
    results: List[QueryResult]
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict[str, Any]]]] = None

HealthResponse

@dataclass
class HealthResponse:
    status: str
    version: str
    loaded_indices: int
    index_dir: str
    memory_usage_bytes: int
    indices: List[IndexSummary]

RerankResponse / RerankResult

@dataclass
class RerankResponse:
    results: List[RerankResult]
    num_documents: int

@dataclass
class RerankResult:
    index: int
    score: float

MetadataResponse

@dataclass
class MetadataResponse:
    metadata: List[Dict[str, Any]]
    count: int

EncodeResponse

@dataclass
class EncodeResponse:
    embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
    num_texts: int

Async Client

The async client provides identical methods with await:

import asyncio
from next_plaid_client import AsyncNextPlaidClient, IndexConfig, SearchParams

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        # Health check
        health = await client.health()
        print(f"Server status: {health.status}")

        # Create index
        await client.create_index("my_index", IndexConfig(nbits=4))

        # Add documents
        await client.add(
            "my_index",
            ["Paris is the capital of France."],
            metadata=[{"country": "France"}]
        )

        # Search
        results = await client.search(
            "my_index",
            ["What is the capital of France?"],
            params=SearchParams(top_k=5)
        )

        # Concurrent operations
        results = await asyncio.gather(
            client.search("index1", ["query1"]),
            client.search("index2", ["query2"]),
            client.search("index3", ["query3"]),
        )

asyncio.run(main())

Input Type Detection

The SDK automatically detects whether inputs are text or embeddings:

Documents

# Text input (first item is str) → uses /update_with_encoding
client.add("index", ["text 1", "text 2"])

# Embedding input (first item is dict with 'embeddings') → uses /update
client.add("index", [{"embeddings": [[0.1, 0.2]]}])

Queries

# Text queries (first item is str) → uses /search_with_encoding
client.search("index", ["query text"])

# Embedding queries (nested list) → uses /search
client.search("index", [[[0.1, 0.2], [0.3, 0.4]]])

Rerank

# Text (query is str) → uses /rerank_with_encoding
client.rerank(query="text", documents=["doc1", "doc2"])

# Embeddings (query is list) → uses /rerank
client.rerank(query=[[0.1, 0.2]], documents=[{"embeddings": [[...]]}])

Project Structure

next-plaid-api/python-sdk/
├── pyproject.toml                 # Package configuration
├── README.md                      # This file
├── next_plaid_client/
│   ├── __init__.py               # Public exports
│   ├── _base.py                  # Base client logic
│   ├── client.py                 # Synchronous client
│   ├── async_client.py           # Async client
│   ├── models.py                 # Data models
│   └── exceptions.py             # Exception classes
└── tests/
    └── test_*.py                 # Test files

Dependencies

PackageVersionPurpose
httpx>= 0.24.0HTTP client (sync + async)

Development Dependencies

PackageVersionPurpose
pytest>= 7.0.0Testing framework
pytest-cov>= 4.0.0Coverage reporting
pytest-asyncio>= 0.21.0Async test support

Version Compatibility

SDK VersionAPI VersionPython
0.4.00.4.0>= 3.8

License

Apache-2.0