-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
Summary
Add a PySpark-based embedding pipeline that reads JSON/JSONL documents, generates embeddings via HuggingFace sentence-transformers, and upserts to Turbopuffer with schema configuration.
Motivation
Enable users to process large document collections using their own Spark infrastructure, embedding documents with HuggingFace models and storing them in Turbopuffer with proper schema configuration (FTS, filterable properties).
Usage
spark-submit spark/pipeline.py --config config.jsonConfiguration
Users provide a JSON config file specifying:
{
"turbopuffer": {
"namespace": "my-namespace",
"api_key_env": "TURBOPUFFER_API_KEY",
"region": "aws-us-east-1"
},
"embedding": {
"model_id": "sentence-transformers/all-MiniLM-L6-v2",
"device": "cpu",
"batch_size": 32
},
"input": {
"path": "/path/to/documents",
"format": "jsonl",
"id_field": "id",
"text_field": "content"
},
"schema": {
"fields": {
"title": {"type": "string", "filterable": true},
"content": {"type": "string", "full_text_search": {"stemming": true}}
},
"include_fields": ["title", "content"]
}
}Directory Structure
spark/
├── pipeline.py # Main Spark job entry point
├── config/
│ ├── schema.py # Pydantic config validation
│ └── example_config.json
├── core/
│ ├── embeddings.py # HuggingFace embedding generation
│ ├── turbopuffer.py # Python Turbopuffer client
│ └── batch.py # Batching utilities for upserts
├── utils/
│ └── retry.py # Retry with exponential backoff
├── requirements.txt
└── README.md
Key Design Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Model loading | Per-partition with caching | Avoids driver memory pressure from broadcasting large models |
| Upsert format | Row-based (upsert_rows) |
Easier to track per-document success/failure |
| Schema handling | Pass with each write | Aligns with Turbopuffer's schema-on-write pattern |
Pipeline Flow
[Read JSON/JSONL] → [Select fields] → [Partition across executors]
↓
[Per partition: Load model (cached) → Embed batches → Accumulate upserts]
↓
[Flush batches to Turbopuffer with retry] → [Log results]
Implementation Tasks
- Create directory structure and
requirements.txt - Implement config validation (
config/schema.py) with Pydantic - Implement embedding generator (
core/embeddings.py) with model caching - Implement batch accumulator (
core/batch.py) respecting 256MB limit - Implement Turbopuffer Python client (
core/turbopuffer.py) - Implement retry utility (
utils/retry.py) - Implement main pipeline (
pipeline.py) - Create example config and README
Future Enhancements
- S3 input support (currently local folder only)
- Parquet input format support
- Progress metrics and monitoring
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels