Skip to content
Open
2 changes: 0 additions & 2 deletions docs/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ in_reply_to (Utf8, nullable) - Message-ID of parent email
subject (Utf8, NOT NULL) - Email subject line
references (Utf8, nullable) - Space-separated Message-IDs of thread ancestors
recipients (Utf8, NOT NULL) - Comma-separated To/Cc recipients
headers (Utf8, NOT NULL) - Full email headers
body (Utf8, NOT NULL) - Email body content
symbols (Utf8, NOT NULL) - JSON array of symbols found in patches/diffs
```
Expand All @@ -319,7 +318,6 @@ symbols (Utf8, NOT NULL) - JSON array of symbols found in patche
- BTree on `date` (chronological queries)
- BTree on `in_reply_to` (threading queries)
- BTree on `references` (threading queries)
- BTree on `headers` (header searches)
- **FTS (Full Text Search) on `from`** - Fast keyword search on sender
- **FTS on `subject`** - Fast keyword search on subject lines
- **FTS on `body`** - Fast keyword search on email bodies
Expand Down
Empty file modified scripts/direct_download.py
100644 → 100755
Empty file.
Empty file modified scripts/nomic2vec.py
100644 → 100755
Empty file.
114 changes: 52 additions & 62 deletions src/bin/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use anyhow::Result;
use clap::Parser;
use colored::Colorize;
use futures::stream::{self, StreamExt};
use semcode::indexer::{
list_shas_in_range, process_commits_pipeline, process_lore_commits_pipeline,
};
Expand Down Expand Up @@ -660,10 +659,10 @@ async fn index_lore_archive(
let total_commits = all_commit_shas.len();
info!("Found {} total commits in lore archive", total_commits);

// Get already indexed commits from database using efficient batched queries
// Get already indexed commits from database
println!("Checking for already-indexed commits...");
let existing_commits = db_manager
.filter_existing_lore_commits(&all_commit_shas)
.get_indexed_lore_commits()
.await?;

// Filter out already indexed commits
Expand Down Expand Up @@ -964,6 +963,8 @@ async fn main() -> Result<()> {
}

analysis_threads
} else if let Ok(env_jobs) = std::env::var("SEMCODE_JOBS") {
env_jobs.parse::<usize>().unwrap_or(0)
} else {
0
};
Expand Down Expand Up @@ -1081,15 +1082,9 @@ async fn main() -> Result<()> {
total_emails_all_archives
);

// Create FTS indices for lore table after data is inserted
// Optimize before creating FTS indices so that compaction
// does not orphan the index data that was just built.
if total_new_emails > 0 {
println!("\nCreating FTS indices for lore table...");
match db_manager.create_lore_fts_indices().await {
Ok(_) => println!("FTS indices created successfully"),
Err(e) => eprintln!("Warning: Failed to create FTS indices: {}", e),
}

// Check if optimization is needed after lore indexing
match db_manager.check_optimization_health().await {
Ok((needs_optimization, message)) => {
if needs_optimization {
Expand All @@ -1106,6 +1101,12 @@ async fn main() -> Result<()> {
error!("Failed to check database health: {}", e);
}
}

println!("\nCreating FTS indices for lore table...");
match db_manager.create_lore_fts_indices().await {
Ok(_) => println!("FTS indices created successfully"),
Err(e) => eprintln!("Warning: Failed to create FTS indices: {}", e),
}
}

println!("\nTo query this database, run:");
Expand Down Expand Up @@ -1173,51 +1174,40 @@ async fn main() -> Result<()> {
let db_threads = args.db_threads;
let total_archives = archives_with_names.len();

// Process archives in parallel (up to 4 concurrent fetches/indexes)
// This provides significant speedup when tracking multiple mailing lists
let concurrency = std::cmp::min(4, total_archives);
// Process archives sequentially. LanceDB merge_insert
// uses a shared DataFusion memory pool, and concurrent
// pipelines writing large lore emails exhaust it,
// causing both to stall indefinitely.
let mut results: Vec<Result<(String, LoreIndexResult), (String, anyhow::Error)>> =
Vec::with_capacity(total_archives);

println!(
"Processing {} archives with concurrency {}...",
total_archives, concurrency
);
for (archive_path, display_name) in archives_with_names {
println!("\n=== Refreshing lore archive: {} ===", display_name);
println!("[{}] Fetching updates from remote...", display_name);

let results: Vec<Result<(String, LoreIndexResult), (String, anyhow::Error)>> =
stream::iter(archives_with_names)
.map(|(archive_path, display_name)| {
let db_manager = db_manager.clone();
async move {
println!("\n=== Refreshing lore archive: {} ===", display_name);

// Fetch new commits from remote (async, runs on blocking thread pool)
println!("[{}] Fetching updates from remote...", display_name);
let lore_repo = match fetch_lore_archive(archive_path.clone()).await {
Ok(repo) => repo,
Err(e) => {
return Err((display_name, e));
}
};

// Index the archive using the shared function
match index_lore_archive(
lore_repo,
&archive_path,
&display_name,
&db_manager,
batch_size,
num_workers,
db_threads,
)
.await
{
Ok(result) => Ok((display_name, result)),
Err(e) => Err((display_name, e)),
}
}
})
.buffer_unordered(concurrency)
.collect()
.await;
let lore_repo = match fetch_lore_archive(archive_path.clone()).await {
Ok(repo) => repo,
Err(e) => {
results.push(Err((display_name, e)));
continue;
}
};

match index_lore_archive(
lore_repo,
&archive_path,
&display_name,
&db_manager,
batch_size,
num_workers,
db_threads,
)
.await
{
Ok(result) => results.push(Ok((display_name, result))),
Err(e) => results.push(Err((display_name, e))),
}
}

// Aggregate results
let mut total_new_emails = 0usize;
Expand Down Expand Up @@ -1260,15 +1250,9 @@ async fn main() -> Result<()> {
}
}

// Create FTS indices for lore table after data is inserted
// Optimize before creating FTS indices so that compaction
// does not orphan the index data that was just built.
if total_new_emails > 0 {
println!("\nCreating FTS indices for lore table...");
match db_manager.create_lore_fts_indices().await {
Ok(_) => println!("FTS indices created successfully"),
Err(e) => eprintln!("Warning: Failed to create FTS indices: {}", e),
}

// Check if optimization is needed
match db_manager.check_optimization_health().await {
Ok((needs_optimization, message)) => {
if needs_optimization {
Expand All @@ -1285,6 +1269,12 @@ async fn main() -> Result<()> {
error!("Failed to check database health: {}", e);
}
}

println!("\nCreating FTS indices for lore table...");
match db_manager.create_lore_fts_indices().await {
Ok(_) => println!("FTS indices created successfully"),
Err(e) => eprintln!("Warning: Failed to create FTS indices: {}", e),
}
}

// Check for new archives available on lore.kernel.org
Expand Down
Loading