next-plaid-client

Python SDK for the NextPlaid ColBERT Search API. Provides synchronous and asynchronous clients for interacting with the next-plaid-api server.

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                           next-plaid-client                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │   NextPlaidClient       │         │   AsyncNextPlaidClient          │    │
│  │     (Synchronous)       │         │     (Asynchronous)              │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │                         │         │                                 │    │
│  │  httpx.Client           │         │  httpx.AsyncClient              │    │
│  │         ↓               │         │         ↓                       │    │
│  │  Blocking I/O           │         │  asyncio I/O                    │    │
│  │                         │         │                                 │    │
│  └───────────┬─────────────┘         └───────────┬─────────────────────┘    │
│              │                                   │                          │
│              └───────────────┬───────────────────┘                          │
│                              ▼                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                     BaseNextPlaidClient                               │  │
│  │                                                                       │  │
│  │  - URL construction          - Payload preparation                    │  │
│  │  - Response parsing          - Error handling                         │  │
│  │  - Input type detection      - Exception mapping                      │  │
│  │                                                                       │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │       Models            │         │       Exceptions                │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │  IndexConfig            │         │  NextPlaidError (base)          │    │
│  │  IndexInfo              │         │  IndexNotFoundError             │    │
│  │  SearchParams           │         │  IndexExistsError               │    │
│  │  SearchResult           │         │  ValidationError                │    │
│  │  QueryResult            │         │  RateLimitError                 │    │
│  │  HealthResponse         │         │  ModelNotLoadedError            │    │
│  │  RerankResponse         │         │  ConnectionError                │    │
│  │  MetadataResponse       │         │  ServerError                    │    │
│  └─────────────────────────┘         └─────────────────────────────────┘    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Features


Installation

# From PyPI
pip install next-plaid-client

# From source
pip install git+https://github.com/lightonai/next-plaid.git#subdirectory=next-plaid-api/python-sdk

# For development
pip install -e "next-plaid-api/python-sdk[dev]"

Requirements


Quick Start

from next_plaid_client import NextPlaidClient, IndexConfig, SearchParams

# Connect to the API
client = NextPlaidClient("http://localhost:8080")

# Check server health
health = client.health()
print(f"Server status: {health.status}")

# Create an index
client.create_index("my_index", IndexConfig(nbits=4))

# Add documents (text - requires model on server)
client.add(
    "my_index",
    ["Paris is the capital of France.", "Berlin is in Germany."],
    metadata=[{"country": "France"}, {"country": "Germany"}]
)

# Semantic search
results = client.search("my_index", ["What is the capital of France?"])

# Hybrid search (semantic + keyword fused with RRF)
results = client.search("my_index", ["capital of France?"],
    text_query=["capital France"], alpha=0.75)

# Print results
for result in results.results:
    for doc_id, score, meta in zip(result.document_ids, result.scores, result.metadata or []):
        print(f"Document {doc_id}: {score:.4f} - {meta}")

Client Initialization

Synchronous Client

from next_plaid_client import NextPlaidClient

client = NextPlaidClient(
    base_url="http://localhost:8080",  # API server URL
    timeout=30.0,                       # Request timeout in seconds
    headers={"Authorization": "..."}    # Optional headers
)

# Context manager usage (auto-closes connection)
with NextPlaidClient("http://localhost:8080") as client:
    health = client.health()

Async Client

import asyncio
from next_plaid_client import AsyncNextPlaidClient

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        health = await client.health()
        print(f"Server status: {health.status}")

asyncio.run(main())

API Reference

Health Check

health = client.health()

Returns: HealthResponse

FieldTypeDescription
statusstrServer status ("healthy")
versionstrAPI version
loaded_indicesintNumber of loaded indices
index_dirstrIndex storage directory
memory_usage_bytesintMemory usage
indicesList[IndexSummary]Summary of each index

Index Management

List Indices

indices: List[str] = client.list_indices()

Get Index Info

info: IndexInfo = client.get_index("my_index")

Returns: IndexInfo

FieldTypeDescription
namestrIndex name
num_documentsintDocument count
num_embeddingsintTotal embeddings
num_partitionsintIVF partitions
avg_doclenfloatAverage tokens per doc
dimensionintEmbedding dimension
has_metadataboolHas metadata DB
metadata_countOptional[int]Metadata entry count
max_documentsOptional[int]Document limit

Create Index

client.create_index("my_index", IndexConfig(
    nbits=4,                    # Quantization bits (2 or 4)
    batch_size=50000,           # Documents per chunk
    seed=42,                    # Random seed
    start_from_scratch=999,     # Rebuild threshold
    max_documents=10000,        # Max documents (None = unlimited)
    fts_tokenizer="unicode61"   # FTS5 tokenizer: "unicode61" (words) or "trigram" (substrings)
))

Parameters:

ParameterTypeDefaultDescription
nbitsint4Quantization bits (2 or 4)
batch_sizeint50000Documents per chunk
seedOptional[int]NoneRandom seed
start_from_scratchint999Rebuild threshold
max_documentsOptional[int]NoneMax documents
fts_tokenizerOptional[str]NoneFTS5 tokenizer: "unicode61" (words) or "trigram" (substrings)

Update Index Config

client.update_index_config("my_index", max_documents=5000)

Delete Index

client.delete_index("my_index")

Document Operations

Add Documents

The add() method automatically detects input type (text vs embeddings).

# Text documents (requires model on server)
client.add(
    "my_index",
    ["Document 1 text", "Document 2 text"],
    metadata=[{"category": "science"}, {"category": "history"}]
)

# With token pooling (reduces embeddings by 2x)
client.add(
    "my_index",
    ["Long document text..."],
    pool_factor=2
)

# Pre-computed embeddings
client.add(
    "my_index",
    [{"embeddings": [[0.1, 0.2], [0.3, 0.4]]}],  # [num_tokens, dim]
    metadata=[{"title": "Doc 1"}]
)

Parameters:

ParameterTypeDescription
index_namestrTarget index name
documentsUnion[List[str], List[Dict]]Text or embeddings
metadataOptional[List[Dict]]Metadata per document
pool_factorOptional[int]Token reduction factor
Returns: str (status message, async 202)

Delete Documents

client.delete(
    "my_index",
    condition="category = ? AND year < ?",
    parameters=["outdated", 2020]
)

Parameters:

ParameterTypeDescription
index_namestrTarget index
conditionstrSQL WHERE clause
parametersOptional[List[Any]]Query parameters

Search Operations

The search() method automatically detects query type (text vs embeddings).

Text Search (requires model)

results = client.search(
    "my_index",
    ["What is machine learning?", "Neural networks"],
    params=SearchParams(top_k=10)
)

Embedding Search

results = client.search(
    "my_index",
    [[[0.1, 0.2], [0.3, 0.4]]],  # [batch, num_tokens, dim]
    params=SearchParams(top_k=10)
)

Filtered Search

results = client.search(
    "my_index",
    ["machine learning"],
    filter_condition="category = ? AND year > ?",
    filter_parameters=["science", 2020]
)

Hybrid Search

Combine semantic and keyword search, fused with Reciprocal Rank Fusion (RRF):

results = client.search(
    "my_index",
    ["What is machine learning?", "How does AI work?"],  # Semantic queries
    text_query=["machine learning", "artificial intelligence"],  # Keyword queries (same length)
    alpha=0.75,                      # 0.0 = pure keyword, 1.0 = pure semantic
    fusion="rrf"                     # "rrf" (default) or "relative_score"
)

Subset Search

results = client.search(
    "my_index",
    ["query"],
    subset=[0, 5, 10, 15]  # Only search these document IDs
)

Search Parameters:

SearchParams(
    top_k=10,                      # Results per query (default: 10)
    n_ivf_probe=8,                 # IVF cells to probe (default: 8)
    n_full_scores=4096,            # Re-ranking candidates (default: 4096)
    centroid_score_threshold=0.4   # Pruning threshold (default: 0.4, set to None to disable)
)

ParameterTypeDefaultDescription
top_kint10Results per query
n_ivf_probeint8IVF cells to probe
n_full_scoresint4096Candidates for exact scoring
centroid_score_thresholdOptional[float]0.4Centroid pruning threshold (set to None to disable)
Hybrid Search Parameters (on search()):

ParameterTypeDefaultDescription
text_queryOptional[List[str]]NoneFTS5 keyword queries (must match queries length in hybrid mode)
alphaOptional[float]0.75Balance: 0.0 = pure keyword, 1.0 = pure semantic
fusionOptional[str]"rrf""rrf" (reciprocal rank fusion) or "relative_score"
Returns: SearchResult
@dataclass
class SearchResult:
    results: List[QueryResult]  # One per query
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict]]]

Metadata Operations

Get All Metadata

response: MetadataResponse = client.get_metadata("my_index")
# response.metadata: List[Dict]
# response.count: int

Get Metadata Count

result = client.get_metadata_count("my_index")
# result["count"]: int
# result["has_metadata"]: bool

Query Metadata

result = client.query_metadata(
    "my_index",
    condition="category = ? AND score > ?",
    parameters=["science", 0.5]
)
# result["document_ids"]: List[int]
# result["count"]: int

Get Metadata by IDs

response = client.get_metadata_by_ids(
    "my_index",
    document_ids=[0, 5, 10],
    limit=100
)

Check Document Existence

result: MetadataCheckResponse = client.check_metadata(
    "my_index",
    document_ids=[0, 1, 2, 999]
)
# result.existing_ids: List[int]
# result.missing_ids: List[int]
# result.existing_count: int
# result.missing_count: int

Text Encoding

Encode texts to ColBERT embeddings (requires model on server).

response: EncodeResponse = client.encode(
    texts=["Hello world", "Test document"],
    input_type="document",  # or "query"
    pool_factor=2           # Optional token reduction
)
# response.embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
# response.num_texts: int

Parameters:

ParameterTypeDefaultDescription
textsList[str]requiredTexts to encode
input_typestr"document""document" or "query"
pool_factorOptional[int]NoneToken reduction factor

Reranking

Reorder documents by relevance using ColBERT's MaxSim scoring.

Text Reranking (requires model)

result = client.rerank(
    query="What is the capital of France?",
    documents=[
        "Berlin is the capital of Germany.",
        "Paris is the capital of France.",
        "Tokyo is the largest city in Japan.",
    ]
)

# Results sorted by score (descending)
for r in result.results:
    print(f"Document {r.index}: {r.score:.4f}")
# Document 1: 15.2341  (Paris - most relevant)
# Document 0: 8.1234   (Berlin - somewhat relevant)
# Document 2: 3.4567   (Tokyo - least relevant)

Embedding Reranking

result = client.rerank(
    query=[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],  # [num_tokens, dim]
    documents=[
        {"embeddings": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]},
        {"embeddings": [[0.7, 0.8, 0.9], [0.1, 0.2, 0.3]]},
    ]
)

Returns: RerankResponse

@dataclass
class RerankResponse:
    results: List[RerankResult]  # Sorted by score descending
    num_documents: int

@dataclass
class RerankResult:
    index: int    # Original document index
    score: float  # MaxSim score

Exception Handling

All exceptions inherit from NextPlaidError:

from next_plaid_client import (
    NextPlaidError,
    IndexNotFoundError,
    IndexExistsError,
    ValidationError,
    RateLimitError,
    ModelNotLoadedError,
    ConnectionError,
    ServerError,
)

try:
    client.get_index("nonexistent")
except IndexNotFoundError as e:
    print(f"Index not found: {e.message}")
    print(f"Error code: {e.code}")
    print(f"HTTP status: {e.status_code}")
except RateLimitError as e:
    print(f"Rate limited: {e.message}")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except ModelNotLoadedError as e:
    print(f"Model required: {e.message}")
except NextPlaidError as e:
    print(f"API error: {e.message} (code: {e.code})")

Exception Hierarchy

ExceptionHTTP StatusDescription
NextPlaidError-Base exception
IndexNotFoundError404Index does not exist
IndexExistsError409Index already exists
ValidationError400Invalid request parameters
RateLimitError429Rate limit exceeded
ModelNotLoadedError503Encoding requires model
ConnectionError-Connection failed
ServerError5xxServer error

Exception Attributes

AttributeTypeDescription
messagestrHuman-readable error message
codeOptional[str]Error code (e.g., INDEX_NOT_FOUND)
detailsOptional[Any]Additional error details
status_codeOptional[int]HTTP status code

Data Models

IndexConfig

@dataclass
class IndexConfig:
    nbits: int = 4                       # Quantization bits
    batch_size: int = 50000              # Documents per chunk
    seed: Optional[int] = None           # Random seed
    start_from_scratch: int = 999        # Rebuild threshold
    max_documents: Optional[int] = None  # Max documents
    fts_tokenizer: Optional[str] = None  # "unicode61" (words) or "trigram" (substrings)

IndexInfo

@dataclass
class IndexInfo:
    name: str
    num_documents: int
    num_embeddings: int
    num_partitions: int
    avg_doclen: float
    dimension: int
    has_metadata: bool
    metadata_count: Optional[int] = None
    max_documents: Optional[int] = None

SearchParams

@dataclass
class SearchParams:
    top_k: int = 10
    n_ivf_probe: int = 8
    n_full_scores: int = 4096
    centroid_score_threshold: Optional[float] = 0.4  # Default: 0.4, set to None to disable

SearchResult / QueryResult

@dataclass
class SearchResult:
    results: List[QueryResult]
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict[str, Any]]]] = None

HealthResponse

@dataclass
class HealthResponse:
    status: str
    version: str
    loaded_indices: int
    index_dir: str
    memory_usage_bytes: int
    indices: List[IndexSummary]

RerankResponse / RerankResult

@dataclass
class RerankResponse:
    results: List[RerankResult]
    num_documents: int

@dataclass
class RerankResult:
    index: int
    score: float

MetadataResponse

@dataclass
class MetadataResponse:
    metadata: List[Dict[str, Any]]
    count: int

EncodeResponse

@dataclass
class EncodeResponse:
    embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
    num_texts: int

Async Client

The async client provides identical methods with await:

import asyncio
from next_plaid_client import AsyncNextPlaidClient, IndexConfig, SearchParams

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        # Health check
        health = await client.health()
        print(f"Server status: {health.status}")

        # Create index
        await client.create_index("my_index", IndexConfig(nbits=4))

        # Add documents
        await client.add(
            "my_index",
            ["Paris is the capital of France."],
            metadata=[{"country": "France"}]
        )

        # Search
        results = await client.search(
            "my_index",
            ["What is the capital of France?"],
            params=SearchParams(top_k=5)
        )

        # Concurrent operations
        results = await asyncio.gather(
            client.search("index1", ["query1"]),
            client.search("index2", ["query2"]),
            client.search("index3", ["query3"]),
        )

asyncio.run(main())

Input Type Detection

The SDK automatically detects whether inputs are text or embeddings:

Documents

# Text input (first item is str) → uses /update_with_encoding
client.add("index", ["text 1", "text 2"])

# Embedding input (first item is dict with 'embeddings') → uses /update
client.add("index", [{"embeddings": [[0.1, 0.2]]}])

Queries

# Text queries (first item is str) → uses /search_with_encoding
client.search("index", ["query text"])

# Embedding queries (nested list) → uses /search
client.search("index", [[[0.1, 0.2], [0.3, 0.4]]])

Rerank

# Text (query is str) → uses /rerank_with_encoding
client.rerank(query="text", documents=["doc1", "doc2"])

# Embeddings (query is list) → uses /rerank
client.rerank(query=[[0.1, 0.2]], documents=[{"embeddings": [[...]]}])

CLI

The package ships with a non-interactive CLI designed for scripts, pipelines, and AI coding agents.

Installation

pip install "next-plaid-client[cli]"

Global Options

next-plaid [OPTIONS] COMMAND

OptionDefaultDescription
--url, -uhttp://localhost:8080Server URL (env: NEXT_PLAID_URL)
--timeout, -t30.0Request timeout in seconds
--header, -HExtra HTTP header (Key: Value), repeatable
--jsonoffOutput raw JSON instead of human-readable text
Every command and subcommand has --help with runnable examples.

health

next-plaid health
next-plaid health --json
next-plaid -u http://remote:8080 health

index

# Create
next-plaid index create my_index
next-plaid index create my_index --nbits 2 --max-documents 10000
next-plaid index create code_index --fts-tokenizer trigram

# Inspect
next-plaid index list
next-plaid index get my_index

# Update config
next-plaid index config my_index --max-documents 50000
next-plaid index config my_index --max-documents 0   # remove limit

# Delete (destructive — requires --yes or prompts)
next-plaid index delete my_index --yes
next-plaid index delete my_index --dry-run

OptionDefaultDescription
--nbits4Quantization bits (2 or 4)
--batch-size50000Documents per indexing batch
--seedRandom seed for K-means
--max-documentsEvict oldest when exceeded (0 to remove limit)
--fts-tokenizerunicode61 (words) or trigram (substrings/code)

document

# Add — text arguments
next-plaid document add my_index --text "Paris is the capital of France"
next-plaid document add my_index --text "Doc 1" --text "Doc 2"

# Add — from file
next-plaid document add my_index --file texts.json        # ["text1", "text2"]
next-plaid document add my_index --file embeddings.json   # [{"embeddings": [[...]]}]

# Add — from stdin
echo '["doc one", "doc two"]' | next-plaid document add my_index --stdin
cat corpus.json | next-plaid document add my_index --stdin

# Add — with metadata sidecar
next-plaid document add my_index --text "Paris" --metadata-file meta.json  # [{"country": "France"}]

# Delete by metadata condition
next-plaid document delete my_index --condition "year < ?" --param 2020 --yes
next-plaid document delete my_index --condition "id IN (?, ?)" --param 1 --param 2 --dry-run

search

# Semantic
next-plaid search my_index "What is the capital of France?"
next-plaid search my_index "query one" "query two" --top-k 5

# Keyword (FTS5 BM25)
next-plaid search my_index --text-query "capital France"

# Hybrid
next-plaid search my_index "capital?" --text-query "capital France" --alpha 0.75 --fusion rrf

# Metadata filter
next-plaid search my_index "cities" --filter "country = ?" --filter-param France

# Restrict to document subset
next-plaid search my_index "query" --subset 1 --subset 2 --subset 5

# From stdin
echo '["query 1", "query 2"]' | next-plaid search my_index --stdin

OptionDefaultDescription
--top-k, -k10Results per query
--n-ivf-probeIVF cells to probe (server default: 8)
--n-full-scoresRe-ranking candidates (server default: 4096)
--centroid-thresholdCentroid pruning threshold (0 to disable)
--filterSQL WHERE filter on metadata
--filter-paramFilter placeholder value, repeatable
--text-queryFTS5 keyword query, repeatable
--alphaHybrid balance: 0=keyword, 1=semantic (default: 0.75)
--fusionrrf or relative_score
--subsetRestrict to document IDs, repeatable

metadata

next-plaid metadata list my_index
next-plaid metadata count my_index
next-plaid metadata check my_index --ids 1 --ids 2 --ids 3
next-plaid metadata query my_index --condition "category = ?" --param science
next-plaid metadata get my_index --ids 1 --ids 2
next-plaid metadata get my_index --condition "score > ?" --param 0.9 --limit 10
next-plaid metadata update my_index -c "status = ?" -p draft --set '{"status": "published"}' --yes

encode

next-plaid encode "Hello world" "Another text"
next-plaid encode --input-type query "What is AI?"
echo '["text1", "text2"]' | next-plaid encode --stdin

rerank

next-plaid rerank -q "capital of France" -d "Paris is in France" -d "Berlin is in Germany"
next-plaid rerank -q "machine learning" --file docs.json
echo '["doc1", "doc2"]' | next-plaid rerank -q "my query" --stdin

Scripting and Agents

The --json flag emits machine-readable JSON on every command:

# Extract document IDs from search results
next-plaid --json search my_index "France capital" | jq '.results[0].document_ids'

# Assert server is healthy before indexing
next-plaid --json health | jq -e '.status == "healthy"'

# Preview a destructive operation before running it
next-plaid document delete my_index --condition "draft = ?" --param true --dry-run

# Run against a remote server
NEXT_PLAID_URL=http://my-server:8080 next-plaid search my_index "query"

Project Structure

next-plaid-api/python-sdk/
├── pyproject.toml                 # Package configuration
├── README.md                      # This file
├── next_plaid_client/
│   ├── __init__.py               # Public exports
│   ├── _base.py                  # Base client logic
│   ├── client.py                 # Synchronous client
│   ├── async_client.py           # Async client
│   ├── cli.py                    # CLI (next-plaid command)
│   ├── models.py                 # Data models
│   └── exceptions.py             # Exception classes
└── tests/
    └── test_*.py                 # Test files

Dependencies

PackageVersionPurpose
httpx>= 0.24.0HTTP client (sync + async)
click>= 8.0.0CLI framework (optional, pip install "next-plaid-client[cli]")

Development Dependencies

PackageVersionPurpose
pytest>= 7.0.0Testing framework
pytest-cov>= 4.0.0Coverage reporting
pytest-asyncio>= 0.21.0Async test support

Version Compatibility

SDK VersionAPI VersionPython
0.4.00.4.0>= 3.8

License

Apache-2.0