Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6,015 changes: 1,658 additions & 4,357 deletions Cargo.lock

Large diffs are not rendered by default.

105 changes: 105 additions & 0 deletions RAPTOR_LEANN_PLAN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# RAPTOR + leann-rs: Incremental HNSW Insertion Plan

## Current Approach (Two-Phase Batch Rebuild)

The spnl RAPTOR indexer uses a two-phase approach to work with leann-rs's
bulk-only `LeannBuilder`:

1. **Phase 1** -- Build the initial HNSW index from base document fragments
2. **Phase 2** -- For each base fragment, search the index for similar
passages, generate LLM summaries, collect all summaries
3. **Phase 3** -- Rebuild the entire HNSW index from scratch with all
original passages + all RAPTOR summaries

## Why This Is a Limitation

In the original RAPTOR implementation (and spnl's previous lancedb-based
approach), summaries were inserted into the vector database **incrementally**.
This meant:

- Summary S1 (generated from fragments A, B, C) gets inserted into the index
- When generating S2 from fragments D, E, F, the search might also find S1 as
a relevant neighbor
- This creates a richer, more interconnected knowledge graph where summaries
can reference and build upon other summaries

With the current batch-rebuild approach:

- All summaries are generated against the base fragments only
- No summary can influence the search results used to generate another summary
- The resulting knowledge graph is shallower (one level of summarization only)

## What leann-rs Would Need

### Option A: Append API on LeannBuilder

Add an `append_to_index()` method that takes an existing index path and adds
new passages to it:

```rust
impl LeannBuilder {
pub fn append_to_index(
&mut self,
index_path: &Path,
provider: &dyn EmbeddingProvider,
) -> Result<()> {
// 1. Load existing HNSW graph
// 2. Load existing passages
// 3. Compute embeddings for new chunks
// 4. Insert new nodes into the graph (incremental HNSW insertion)
// 5. Re-write the index files
}
}
```

### Option B: Incremental Insert on HnswGraph

Add a lower-level API for inserting individual vectors into an existing graph:

```rust
impl HnswGraph {
pub fn insert(&mut self, vector: &[f32], level: i32) -> usize {
// Standard HNSW insertion algorithm
// Returns the node ID of the inserted vector
}
}
```

This is more fundamental and would enable Option A, but also other use cases.

### Considerations

- HNSW supports incremental insertion by design (the original algorithm is
incremental), so this is architecturally sound
- The current `build_hnsw()` function already does incremental insertion
internally during construction; the API just doesn't expose single-node
insertion after the initial build
- CSR (compact) format would need to be re-computed after insertions, or
insertions would need to work on the standard format first
- The `VectorStorage` would need to support appending new vectors
- The passages JSONL file and offset index would need append support

## Performance Implications

- **Current approach**: O(N) embeddings computed twice (once for initial build,
once for rebuild with summaries). Total embedding calls = 2N + S where S is
the number of summaries.
- **Incremental approach**: O(N + S) embedding calls total. Each summary is
embedded and inserted once.
- For large corpora, the rebuild cost is significant because all original
embeddings must be recomputed during Phase 3.

### Mitigation: Pre-computed Embeddings

A partial mitigation would be to cache the embeddings from Phase 1 and use
`build_index_from_embeddings()` in Phase 3. This avoids recomputing base
embeddings but still requires a full graph rebuild. leann-rs already supports
this via `LeannBuilder::build_index_from_embeddings()`.

## Priority

Medium -- the two-phase approach is functionally correct and produces useful
RAPTOR summaries. The main loss is the depth of cross-referencing between
summaries, which may not significantly impact retrieval quality for most
document sizes. The performance overhead of double-embedding is more pressing
for large corpora.
4 changes: 2 additions & 2 deletions cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ pub struct Args {
#[arg(short = 'k', long)]
pub chunk_size: Option<usize>,

/// Vector DB Url
/// Directory where HNSW indexes are stored
#[cfg(feature = "rag")]
#[arg(long, default_value = "data/spnl")]
pub vecdb_uri: String,
pub index_dir: String,

/// Reverse order
#[arg(short, long, default_value_t = false)]
Expand Down
26 changes: 22 additions & 4 deletions cli/src/builtins/rag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
use itertools::Itertools;

/// Parse JSONL lines, extracting the "text" field from each line.
fn parse_jsonl(s: &str) -> Vec<String> {
#[derive(serde::Deserialize)]
struct JsonlText {
text: String,
}
serde_json::Deserializer::from_str(s)
.into_iter::<JsonlText>()
.filter_map(|line| match line {
Ok(JsonlText { text }) => Some(text),
Err(e) => {
eprintln!("Error parsing jsonl line {e}");
None
}
})
.collect()
}

pub fn query(args: crate::args::Args) -> anyhow::Result<spnl::ir::Query> {
let crate::args::Args {
model,
Expand Down Expand Up @@ -38,13 +56,13 @@ pub fn query(args: crate::args::Args) -> anyhow::Result<spnl::ir::Query> {
.unwrap_or("none".to_string())
),
spnl::ir::Document::Text(
spnl::windowing::jsonl(include_str!("fiqa-first100lines.jsonl"))?
parse_jsonl(include_str!("fiqa-first100lines.jsonl"))
.into_iter()
.map(|line| {
.map(|line: String| {
if chunk_size.is_none() || line.len() == chunk_size.unwrap() {
line.to_string()
line
} else {
let width = chunk_size.unwrap_or(1000); // this could probably safely be `.unwrap()`
let width = chunk_size.unwrap_or(1000);
if line.len() < width {
format!(
"{}{}",
Expand Down
8 changes: 1 addition & 7 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,7 @@ async fn run(args: Args) -> Result<(), SpnlError> {
.verbose(args.verbose)
.max_aug(args.max_aug)
.shuffle(args.shuffle)
.vecdb_uri(args.vecdb_uri.clone())
.vecdb_table(
args.builtin
.clone()
.map(|builtin| format!("builtin.{builtin:?}"))
.unwrap_or_else(|| args.file.clone().unwrap_or("default".to_string())),
)
.index_dir(args.index_dir.clone())
.build()?,
};

Expand Down
15 changes: 3 additions & 12 deletions spnl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,7 @@ openssl-vendored = ["dep:openssl"]
pull = ["dep:reqwest","dep:tokio-util"]
ffi = []
pypi = ["ffi","tok","dep:pyo3","pyo3/extension-module","dep:thiserror"]
rag = ["run","dep:sha2", "dep:lancedb","dep:tracing","dep:arrow-schema","dep:arrow-array","dep:itertools","dep:pdf-extract","dep:async-recursion","dep:regex","dep:rand"]
# Cloud backend features (commented out to avoid --all-features pulling in heavyweight dependencies)
# Uncomment individually as needed, or move to separate crates in the future
# rag-aws = ["rag", "lancedb/aws"]
# rag-azure = ["rag", "lancedb/azure"]
# rag-gcs = ["rag", "lancedb/gcs"]
# rag-oss = ["rag", "lancedb/oss"]
# rag-dynamodb = ["rag", "lancedb/dynamodb"]
rag = ["run","dep:sha2","dep:leann-core","dep:ndarray","dep:itertools","dep:pdf-extract","dep:async-recursion","dep:regex","dep:rand"]
rag-deep-debug = []
run = ["dep:futures","dep:indicatif","dep:async-recursion","dep:thiserror","dep:tabled"]
run_py = ["run","pypi","pyo3/experimental-async","run","ollama","openai","gemini","dep:tokio","tokio/rt-multi-thread"]
Expand Down Expand Up @@ -73,10 +66,8 @@ tokio = { version = "1.44.1", features = ["io-std", "io-util", "signal"], option
tokio-stream = { version = "0.1.18", features = ["net"], optional = true }
tokio-util = { version = "0.7.16", optional = true }
anyhow = { version = "1.0.98" }
lancedb = { version = "0.26.0", default-features = false, optional = true }
tracing = { version = "0.1.41", optional = true }
arrow-schema = { version = "57.3", optional = true }
arrow-array = { version = "57.3", optional = true }
leann-core = { version = "0.1.1", optional = true }
ndarray = { version = "0.16", optional = true }
either = { version = "1.13", optional = true }
indexmap = { version = "2.7.0", optional = true }
itertools = { version = "0.14.0", optional = true }
Expand Down
50 changes: 50 additions & 0 deletions spnl/src/augment/embed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,53 @@ pub async fn embed(

Ok(embeddings.into_iter())
}

/// Adapter that implements leann_core's EmbeddingProvider trait by
/// delegating to spnl's async embed() function via block_on.
pub struct SpnlEmbeddingProvider {
pub model: String,
pub dimensions: usize,
}

impl leann_core::embedding::EmbeddingProvider for SpnlEmbeddingProvider {
fn compute_embeddings(&self, chunks: &[String]) -> anyhow::Result<ndarray::Array2<f32>> {
// Run spnl's async embed() in a fresh tokio runtime on a
// separate thread. This avoids deadlocking when called from
// within std::thread::scope (as LeannBuilder::build_index does)
// regardless of the outer tokio runtime flavor.
let model = self.model.clone();
let chunks_owned = chunks.to_vec();
let vecs: Vec<Vec<f32>> = std::thread::scope(|s| {
s.spawn(|| {
tokio::runtime::Runtime::new()
.expect("Failed to create tokio runtime for embedding")
.block_on(embed(&model, EmbedData::Vec(chunks_owned)))
})
.join()
.expect("Embedding thread panicked")
})?
.collect();

let nrows = vecs.len();
let ncols = self.dimensions;
let mut data = Vec::with_capacity(nrows * ncols);
for v in &vecs {
if v.len() < ncols {
data.extend_from_slice(v);
data.resize(data.len() + ncols - v.len(), 0.0);
} else {
data.extend_from_slice(&v[..ncols]);
}
}

Ok(ndarray::Array2::from_shape_vec((nrows, ncols), data)?)
}

fn dimensions(&self) -> usize {
self.dimensions
}

fn name(&self) -> &str {
&self.model
}
}
Loading
Loading