diff --git a/docs/rebuild.md b/docs/rebuild.md new file mode 100644 index 000000000..be6b35640 --- /dev/null +++ b/docs/rebuild.md @@ -0,0 +1,124 @@ +# Index Rebuild + +Rebuild allows you to reconstruct an HNSW index graph with new configuration parameters (M, ef_construction) without re-uploading vector data. All vectors are re-indexed from MDBX storage — only the graph structure is rebuilt. + +## API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/index/{name}/rebuild` | Start async rebuild | +| GET | `/api/v1/index/{name}/rebuild/status` | Check rebuild progress | + +--- + +## Start Rebuild + +**POST** `/api/v1/index/{name}/rebuild` + +All parameters are optional. Omitted parameters retain their current values. + +```json +{ + "M": 32, + "ef_con": 256 +} +``` + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `M` | int | HNSW graph connectivity (4–512) | +| `ef_con` | int | Construction-time search quality (8–4096) | + +**Response 202:** +```json +{ + "status": "rebuilding", + "previous_config": { "M": 16, "ef_con": 128 }, + "new_config": { "M": 32, "ef_con": 256 }, + "total_vectors": 50000 +} +``` + +**Errors:** + +| Code | Condition | +|------|-----------| +| 400 | No changes specified, invalid parameters, or attempted to change `precision`/`space_type` | +| 404 | Index not found | +| 409 | Rebuild or backup already in progress for this user | + +--- + +## Check Progress + +**GET** `/api/v1/index/{name}/rebuild/status` + +**Status values:** + +| Status | Meaning | +|--------|---------| +| `idle` | No rebuild has run for this index (or querying a different index) | +| `in_progress` | Rebuild is currently running | +| `completed` | Rebuild finished successfully | +| `failed` | Rebuild failed (see `error` field) | + +**In progress:** +```json +{ + "status": "in_progress", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z" +} +``` + +**Completed:** +```json +{ + "status": "completed", + "vectors_processed": 100000, + "total_vectors": 100000, + "percent_complete": 100.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:32:15Z" +} +``` + +**Failed:** +```json +{ + "status": "failed", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:31:05Z", + "error": "Out of memory" +} +``` + +Status is per-index. The `completed`/`failed` state persists until the next rebuild is started for that user. + +--- + +## Restrictions + +The following parameters **cannot** be changed via rebuild (returns 400): +- `precision` (quantization level) +- `space_type` + + +--- + +## Behavior + +- **All vectors are re-indexed** from MDBX storage into a new HNSW graph with the updated configuration. +- **Search continues** during rebuild — queries use the old index until the rebuild completes. +- **Write operations** (insert, delete, update) will block and timeout while the rebuild is running, same as during backup. +- **One rebuild at a time per user** — cannot start a rebuild on any index while another rebuild is in progress for the same user. Also cannot run concurrently with a backup. +- **Periodic checkpoints** — the in-progress graph is saved to a temp file at regular intervals. +- **On completion**, the new graph replaces `default.idx`. All temporary and intermediate files are cleaned up. +- **On server restart** during an incomplete rebuild, the old index loads normally. Temp files are cleaned up automatically. The rebuild must be restarted manually. diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 86acc0f14..9f562a01f 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -197,6 +197,7 @@ struct PersistenceConfig { }; #include "../storage/backup_store.hpp" +#include "rebuild.hpp" class IndexManager { private: @@ -220,7 +221,10 @@ class IndexManager { std::thread autosave_thread_; std::atomic running_{true}; BackupStore backup_store_; + Rebuild rebuild_; void executeBackupJob(const std::string& index_id, const std::string& backup_name); + void executeRebuildJob(const std::string& index_id, const std::string& username, + size_t new_M, size_t new_ef_con); std::unique_ptr createWAL(const std::string& index_id) { const std::string wal_dir = data_dir_ + "/" + index_id; @@ -569,6 +573,7 @@ class IndexManager { backup_store_(data_dir) { std::filesystem::create_directories(data_dir); metadata_manager_ = std::make_unique(data_dir); + rebuild_.cleanupTempFiles(data_dir); // Start the autosave thread autosave_thread_ = std::thread(&IndexManager::autosaveLoop, this); } @@ -1878,6 +1883,54 @@ class IndexManager { std::pair validateBackupName(const std::string& backup_name) const { return backup_store_.validateBackupName(backup_name); } + + // Metadata access + std::optional getMetadata(const std::string& index_id) { + return metadata_manager_->getMetadata(index_id); + } + + // Index stats (safe to call from routes) + size_t getElementCount(const std::string& index_id) { + auto& entry = getIndexEntry(index_id); + return entry.alg->getElementsCount(); + } + + + // ========== Rebuild operations ========== + + // Orchestration method (defined below after class) + std::pair rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con); + + bool hasActiveRebuild(const std::string& username) const { + return rebuild_.hasActiveRebuild(username); + } + + nlohmann::json getRebuildProgress(const std::string& username, + const std::string& index_id) const { + auto state = rebuild_.getActiveRebuild(username); + if (state && state->index_id == index_id) { + size_t processed = state->vectors_processed.load(); + size_t total = state->total_vectors.load(); + double percent = total > 0 ? (100.0 * processed / total) : 0.0; + nlohmann::json result = { + {"status", state->status}, + {"vectors_processed", processed}, + {"total_vectors", total}, + {"percent_complete", percent}, + {"started_at", Rebuild::formatTime(state->started_at)} + }; + if (state->status == "completed" || state->status == "failed") { + result["completed_at"] = Rebuild::formatTime(state->completed_at); + } + if (state->status == "failed" && !state->error_message.empty()) { + result["error"] = state->error_message; + } + return result; + } + return {{"status", "idle"}}; + } }; // ========== IndexManager backup implementations ========== @@ -2157,3 +2210,214 @@ inline std::pair IndexManager::createBackupAsync(const std::s return {true, backup_name}; } + +// ========== IndexManager rebuild implementations ========== + +inline std::pair IndexManager::rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con) { + // Validate index exists + auto meta = metadata_manager_->getMetadata(index_id); + if (!meta) { + return {false, "Index not found"}; + } + + // Extract username for backup check + std::string username; + size_t pos = index_id.find('/'); + if (pos != std::string::npos) { + username = index_id.substr(0, pos); + } else { + return {false, "Invalid index ID format"}; + } + + // Check for active backup or rebuild + if (backup_store_.hasActiveBackup(username)) { + return {false, "Backup already in progress for user: " + username}; + } + if (rebuild_.hasActiveRebuild(username)) { + return {false, "Rebuild already in progress for user: " + username}; + } + + // Load entry to get current element count + auto& entry = getIndexEntry(index_id); + size_t current_count = entry.alg->getElementsCount(); + + // Ensure at least one parameter differs + if (new_M == meta->M && new_ef_con == meta->ef_con) { + return {false, "No configuration changes specified"}; + } + + // Set active rebuild state (per-user, one rebuild at a time) + rebuild_.setActiveRebuild(username, index_id, current_count); + + // Spawn background thread (same pattern as createBackupAsync) + std::thread([this, index_id, username, new_M, new_ef_con]() { + executeRebuildJob(index_id, username, new_M, new_ef_con); + }).detach(); + + LOG_INFO(2050, index_id, "Rebuild started: M=" << new_M + << " ef_con=" << new_ef_con); + + return {true, "Rebuild started"}; +} + +inline void IndexManager::executeRebuildJob(const std::string& index_id, + const std::string& username, + size_t new_M, size_t new_ef_con) { + std::string base_path = data_dir_ + "/" + index_id; + std::string temp_path = rebuild_.getTempPath(base_path); + std::string timestamped_path = rebuild_.getTimestampedPath(base_path); + std::string vector_storage_dir = base_path + "/vectors"; + std::string index_path = vector_storage_dir + "/" + settings::DEFAULT_SUBINDEX + ".idx"; + + try { + auto& entry = getIndexEntry(index_id); + + // Hold operation_mutex for entire rebuild — writes timeout, searches continue + std::lock_guard operation_lock(entry.operation_mutex); + + // Phase 1 — Save current state + saveIndexInternal(entry); + + // Read current config from the existing HNSW graph + auto space_type = entry.alg->getSpaceType(); + size_t dim = entry.alg->getDimension(); + auto quant_level = entry.alg->getQuantLevel(); + int32_t checksum = entry.alg->getChecksum(); + size_t max_elements = entry.alg->getMaxElements(); + + // Phase 2 — Build new HNSW (same max_elements as current index) + auto new_alg = std::make_unique>( + max_elements, space_type, dim, new_M, new_ef_con, + settings::RANDOM_SEED, quant_level, checksum); + + // Set vector fetcher BEFORE adding vectors — searchBaseLayer during + // graph construction needs this to compute distances for base-layer-only + // nodes (base layer doesn't store vector data inline) + new_alg->setVectorFetcher([vs = entry.vector_storage](ndd::idInt label, uint8_t* buffer) { + return vs->get_vector(label, buffer); + }); + + new_alg->setVectorFetcherBatch([vs = entry.vector_storage](const ndd::idInt* labels, + uint8_t* buffers, + bool* success, + size_t count) -> size_t { + return vs->get_vectors_batch_into(labels, buffers, success, count); + }); + + // Iterate VectorStore and re-insert all vectors + auto cursor = entry.vector_storage->getCursor(); + const size_t batch_size = settings::RECOVERY_BATCH_SIZE; + size_t total_processed = 0; + size_t batches_since_checkpoint = 0; + constexpr size_t CHECKPOINT_INTERVAL = 5; // Save temp every 5 batches + + while (cursor.hasNext()) { + // Collect batch + std::vector>> batch; + batch.reserve(batch_size); + while (cursor.hasNext() && batch.size() < batch_size) { + auto [label, vec_bytes] = cursor.next(); + if (!vec_bytes.empty()) { + batch.emplace_back(label, std::move(vec_bytes)); + } + } + + if (batch.empty()) { + break; + } + + // Multi-threaded insert (same pattern as addVectors and recoverIndex) + size_t num_threads = std::min(settings::NUM_RECOVERY_THREADS, batch.size()); + std::atomic next{0}; + std::vector threads; + + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back([&]() { + size_t i; + while ((i = next.fetch_add(1)) < batch.size()) { + const auto& [label, vec_bytes] = batch[i]; + new_alg->addPoint(vec_bytes.data(), label); + } + }); + } + + for (auto& th : threads) { + th.join(); + } + + total_processed += batch.size(); + + // Update progress + auto state = rebuild_.getActiveRebuild(username); + if (state) { + state->vectors_processed.store(total_processed); + } + + // Periodic checkpoint save + batches_since_checkpoint++; + if (batches_since_checkpoint >= CHECKPOINT_INTERVAL) { + new_alg->saveIndex(temp_path); + batches_since_checkpoint = 0; + } + } + + // Phase 3 — Save final + Copy + Swap + + // Save new graph to timestamped file + new_alg->saveIndex(timestamped_path); + + // Copy to canonical name (overwrites old default.idx on disk) + std::filesystem::copy_file(timestamped_path, index_path, + std::filesystem::copy_options::overwrite_existing); + + // Load fresh from disk + swap pointer (reloadIndex pattern) + auto fresh_alg = std::make_unique>(index_path, 0); + + fresh_alg->setVectorFetcher([vs = entry.vector_storage](ndd::idInt label, uint8_t* buffer) { + return vs->get_vector(label, buffer); + }); + + fresh_alg->setVectorFetcherBatch([vs = entry.vector_storage](const ndd::idInt* labels, + uint8_t* buffers, + bool* success, + size_t count) -> size_t { + return vs->get_vectors_batch_into(labels, buffers, success, count); + }); + + entry.alg = std::move(fresh_alg); + + // Delete temp checkpoint and timestamped file + if (std::filesystem::exists(temp_path)) { + std::filesystem::remove(temp_path); + } + if (std::filesystem::exists(timestamped_path)) { + std::filesystem::remove(timestamped_path); + } + + // Update metadata with new config + auto meta = metadata_manager_->getMetadata(index_id); + if (meta) { + meta->M = new_M; + meta->ef_con = new_ef_con; + meta->total_elements = entry.alg->getElementsCount(); + metadata_manager_->storeMetadata(index_id, *meta); + } + + entry.markUpdated(); + entry.updated = false; // We just saved the new graph + + LOG_INFO(2051, index_id, "Rebuild completed: " << total_processed << " vectors rebuilt"); + rebuild_.completeActiveRebuild(username); + + } catch (const std::exception& e) { + LOG_ERROR(2052, index_id, "Rebuild failed: " << e.what()); + + // Cleanup temp file on error + if (std::filesystem::exists(temp_path)) { + std::filesystem::remove(temp_path); + } + rebuild_.failActiveRebuild(username, e.what()); + } +} diff --git a/src/core/rebuild.hpp b/src/core/rebuild.hpp new file mode 100644 index 000000000..27777fa57 --- /dev/null +++ b/src/core/rebuild.hpp @@ -0,0 +1,130 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "settings.hpp" +#include "log.hpp" + +struct ActiveRebuild { + std::string index_id; + std::string status{"in_progress"}; // "in_progress", "completed", "failed" + std::string error_message; + std::atomic vectors_processed{0}; + std::atomic total_vectors{0}; + std::chrono::system_clock::time_point started_at; + std::chrono::system_clock::time_point completed_at; +}; + +class Rebuild { +private: + // Keyed by username — one rebuild per user at a time + std::unordered_map> active_rebuilds_; + mutable std::mutex rebuild_state_mutex_; + + static std::string timeToISO8601(std::chrono::system_clock::time_point tp) { + auto time_t_val = std::chrono::system_clock::to_time_t(tp); + std::tm tm_val{}; + gmtime_r(&time_t_val, &tm_val); + std::ostringstream oss; + oss << std::put_time(&tm_val, "%Y-%m-%dT%H:%M:%SZ"); + return oss.str(); + } + +public: + Rebuild() = default; + + // Lifecycle — cleanup temp files from interrupted rebuilds on startup + void cleanupTempFiles(const std::string& data_dir) { + if (!std::filesystem::exists(data_dir)) { + return; + } + try { + std::string temp_filename = std::string(settings::DEFAULT_SUBINDEX) + ".idx.temp"; + for (const auto& entry : std::filesystem::recursive_directory_iterator(data_dir)) { + if (entry.is_regular_file() && + entry.path().filename().string() == temp_filename) { + std::filesystem::remove(entry.path()); + } + } + } catch (const std::exception&) { + // Silently ignore cleanup errors on startup + } + } + + // State tracking — per user + + void setActiveRebuild(const std::string& username, const std::string& index_id, + size_t total_vectors) { + std::lock_guard lock(rebuild_state_mutex_); + auto state = std::make_shared(); + state->index_id = index_id; + state->status = "in_progress"; + state->total_vectors.store(total_vectors); + state->vectors_processed.store(0); + state->started_at = std::chrono::system_clock::now(); + active_rebuilds_[username] = state; + } + + void completeActiveRebuild(const std::string& username) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->status = "completed"; + it->second->completed_at = std::chrono::system_clock::now(); + } + } + + void failActiveRebuild(const std::string& username, const std::string& error) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->status = "failed"; + it->second->error_message = error; + it->second->completed_at = std::chrono::system_clock::now(); + } + } + + bool hasActiveRebuild(const std::string& username) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + // Only "in_progress" blocks a new rebuild + return it != active_rebuilds_.end() && it->second->status == "in_progress"; + } + + std::shared_ptr getActiveRebuild(const std::string& username) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + return it->second; + } + return nullptr; + } + + // Format state as JSON fields + static std::string formatTime(std::chrono::system_clock::time_point tp) { + return timeToISO8601(tp); + } + + // Path helpers + + static std::string getTempPath(const std::string& index_dir) { + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx.temp"; + } + + static std::string getTimestampedPath(const std::string& index_dir) { + auto ts = std::to_string( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count() + ); + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx." + ts; + } +}; diff --git a/src/main.cpp b/src/main.cpp index f4b06fb01..e618bf810 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -739,6 +739,101 @@ int main(int argc, char** argv) { } }); + // ========== Rebuild operations ========== + + // Start index rebuild + CROW_ROUTE(app, "/api/v1/index//rebuild") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("POST"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + auto body = crow::json::load(req.body); + if (!body) { + return json_error(400, "Invalid JSON"); + } + + // Reject parameters that cannot be changed via rebuild + if (body.has("precision")) { + return json_error(400, "precision cannot be changed via rebuild"); + } + if (body.has("space_type")) { + return json_error(400, "space_type cannot be changed via rebuild"); + } + + // Get current metadata for defaults + auto meta = index_manager.getMetadata(index_id); + if (!meta) { + return json_error(404, "Index not found"); + } + + // Parse parameters with current values as defaults + size_t new_M = body.has("M") ? (size_t)body["M"].i() : meta->M; + size_t new_ef_con = body.has("ef_con") ? (size_t)body["ef_con"].i() : meta->ef_con; + + // Validate M + if (new_M < settings::MIN_M || new_M > settings::MAX_M) { + return json_error(400, + "M must be between " + std::to_string(settings::MIN_M) + + " and " + std::to_string(settings::MAX_M)); + } + + // Validate ef_con + if (new_ef_con < settings::MIN_EF_CONSTRUCT || new_ef_con > settings::MAX_EF_CONSTRUCT) { + return json_error(400, + "ef_con must be between " + std::to_string(settings::MIN_EF_CONSTRUCT) + + " and " + std::to_string(settings::MAX_EF_CONSTRUCT)); + } + + // Get actual vector count for response + size_t actual_element_count = 0; + try { + actual_element_count = index_manager.getElementCount(index_id); + } catch (...) {} + + try { + auto [success, message] = index_manager.rebuildIndexAsync( + index_id, new_M, new_ef_con); + + if (!success) { + int code = (message.find("already in progress") != std::string::npos) ? 409 : 400; + return json_error(code, message); + } + + crow::json::wvalue response; + response["status"] = "rebuilding"; + response["previous_config"]["M"] = meta->M; + response["previous_config"]["ef_con"] = meta->ef_con; + response["new_config"]["M"] = new_M; + response["new_config"]["ef_con"] = new_ef_con; + response["total_vectors"] = actual_element_count; + return crow::response(202, response.dump()); + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + + // Get rebuild status + CROW_ROUTE(app, "/api/v1/index//rebuild/status") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("GET"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + try { + auto progress = index_manager.getRebuildProgress(ctx.username, index_id); + crow::response res; + res.code = 200; + res.set_header("Content-Type", "application/json"); + res.body = progress.dump(); + return res; + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + // List indexes for current user CROW_ROUTE(app, "/api/v1/index/list") .CROW_MIDDLEWARES(app, AuthMiddleware)