diff --git a/CLAUDE.md b/CLAUDE.md index 7476598..1f62d2a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -99,14 +99,14 @@ Features are listed in dependency order. Each phase must be complete before the **Step 2 — Log primitives** -- [x] **Format** (`src/log/format.rs`): 32 KB blocks. Record header: `[crc32c: u32 LE][len: u16 LE][type: u8]` (7 +- [x] **Format** (`src/logfile/format.rs`): 32 KB blocks. Record header: `[crc32c: u32 LE][len: u16 LE][type: u8]` (7 bytes). Fragment types: `Zero=0` (pad), `Full=1`, `First=2`, `Middle=3`, `Last=4`. Constants: `BLOCK_SIZE = 32768`, `HEADER_SIZE = 7`. See `db/log_format.h`. -- [x] **`log::Writer`** (`src/log/writer.rs`): Wraps `BufWriter` directly — no `Env` abstraction yet (deferred to +- [x] **`logfile::Writer`** (`src/logfile/writer.rs`): Wraps `BufWriter` directly — no `Env` abstraction yet (deferred to Phase 9). `add_record(&[u8])` fragments the payload across block boundaries; pads trailing fewer-than-7-byte remnants with zeros; pre-computes per-type CRCs on construction. Constructor: `new(file: File, dest_length: u64)` where `dest_length` allows resuming an existing file. See `db/log_writer.h/cc`. -- [x] **`log::Reader`** (`src/log/reader.rs`): Wraps `File`. `read_record() -> Option>` reassembles +- [x] **`logfile::Reader`** (`src/logfile/reader.rs`): Wraps `File`. `read_record() -> Option>` reassembles multi-fragment records into a scratch buffer. Optional CRC verification (`checksum: bool`). Reports corruption via a `Reporter` trait (one method: `corruption(bytes: u64, reason: &str)`). Resynchronisation: when `initial_offset > 0`, skips `Middle`/`Last` fragments until a `Full` or `First` is found. See `db/log_reader.h/cc`. @@ -114,14 +114,14 @@ Features are listed in dependency order. Each phase must be complete before the **Step 3 — Integration** - [x] **`Db::open(path)`** (`src/lib.rs` or `src/db/mod.rs`): Creates the directory if absent; opens or creates - `/000001.log`; constructs `Db` with a live `log::Writer`. Returns `Result`. `Db::default()` retained + `/000001.log`; constructs `Db` with a live `logfile::Writer`. Returns `Result`. `Db::default()` retained for in-memory/test use (no WAL). Full MANIFEST-driven log-number tracking deferred to Phase 9. - [x] **`Db::write` wired to WAL**: Change signature to `fn write(&self, opts: &WriteOptions, batch: &WriteBatch) -> Result<(), Error>`. Under the write lock: read `start_seq`, call `batch.set_sequence(start_seq)`, call `log_writer.add_record(batch.contents())`, sync if `opts.sync`, iterate into memtable, advance `last_sequence`. The stamp must precede the log write so WAL records carry the correct embedded sequence for recovery replay. `Db::put`/`delete` updated to forward a default `WriteOptions`. -- [x] **Recovery** (`Db::open` calls this when WAL exists): Construct a `log::Reader` from offset 0; for each record +- [x] **Recovery** (`Db::open` calls this when WAL exists): Construct a `logfile::Reader` from offset 0; for each record decode it as a `WriteBatch` and replay via `batch.iterate(&mut inserter)` using the batch's embedded sequence; after all records restore `last_sequence` to the highest sequence seen. Incomplete trailing records (torn write on crash) are silently ignored. @@ -237,10 +237,13 @@ what remains is compaction, the full Iterator/Snapshot API, and operational hygi when no data exists at L2+). Writes output SSTables to L1 (new file per `max_file_size`). `install_compaction` (under lock) records a single `VersionEdit` deleting all inputs and adding all outputs, then calls `log_and_apply`. `log_and_apply` now sorts L1–L6 files by smallest user key after each edit. `maybe_compact` - orchestrates the three phases; called from `Db::write` in a loop (up to 32×) after every flush. Slow-write - throttle: sleeps 1 ms per iteration when L0 ≥ `L0_SLOWDOWN_WRITES_TRIGGER` (8). No background thread — runs - synchronously on the writing goroutine. `compact_pointer` round-robin not implemented (all L0 files always - selected). See `db/db_impl.cc: DBImpl::BackgroundCompaction`, `DoCompactionWork`, `MakeRoomForWrite`. + orchestrates the three phases; called by the background thread after each flush. Compaction is fully + asynchronous — a dedicated background thread runs flush and compaction while writers proceed. `Db` is split into + `Arc` (shared with the background thread) + `JoinHandle` (joined on drop). Write backpressure: + `make_room_for_write` sleeps 1 ms at L0 ≥ 8 (`L0_SLOWDOWN_WRITES_TRIGGER`), blocks at L0 ≥ 12 + (`L0_STOP_WRITES_TRIGGER`), and waits for an in-progress flush if `imm` is occupied. Level-score scheduling, + compact-pointer round-robin, seek-based compaction, trivial-move optimisation, and grandparent-overlap limiting + are all implemented. See `db/db_impl.cc: DBImpl::BackgroundCompaction`, `DoCompactionWork`, `MakeRoomForWrite`. - [x] **`Db::compact_range(begin, end)`**: Manual compaction of a user-key range (`Option<&[u8]>` for open bounds). Finds the deepest level with files overlapping `[begin, end]`, then calls `compact_level_range` for each level from 0 to `max_level - 1`. Each call uses the three-phase lock protocol; newly compacted files are visible to @@ -324,7 +327,7 @@ what remains is compaction, the full Iterator/Snapshot API, and operational hygi `verify_checksums = opts.verify_checksums || self.options.paranoid_checks` is computed in `Db::get_with_options` and `Db::new_iterator` and forwarded to `Version::get(verify_checksums)` and `Table::new_iterator(verify_checksums)` (which threads it into the `BlockFn` closure). `Options::paranoid_checks` is also passed to `VersionSet::recover` - and `recover_wal` as the `checksum` argument to `log::Reader::new`, so MANIFEST and WAL records are verified during + and `recover_wal` as the `checksum` argument to `logfile::Reader::new`, so MANIFEST and WAL records are verified during recovery. See `db/db_impl.cc: DBImpl::Get`, `table/table.cc: Table::InternalGet`. --- @@ -359,8 +362,10 @@ what remains is compaction, the full Iterator/Snapshot API, and operational hygi - **Custom comparator**: Wire `Options::comparator` through to all key-comparison sites (skip list, block seek, compaction). Currently hardcoded bytewise. Needed to use RoughDB as a sorted map on non-lexicographic keys. See `include/leveldb/comparator.h`. -- **`RepairDB(path, options)`**: Scans the database directory, recovers as many SSTables as possible from a corrupt - or partial MANIFEST, and rebuilds a valid MANIFEST. See `db/repair.cc`. +- ✅ **`Db::repair(path, options)`**: Scans the database directory for surviving `.ldb` and `.log` files, converts WALs + to SSTables via `LogReader` → `Memtable` → `TableBuilder`, extracts metadata from all SSTables, writes a fresh + `MANIFEST-000001` placing all files at L0, and writes `CURRENT`. Corrupt records/files are skipped and archived to + `lost/`. Port of `db/repair.cc: RepairDB`. - **`Options::reuse_logs`**: Experimental LevelDB flag — when set, the existing WAL and MANIFEST files are reused on `Db::open` rather than creating new ones after recovery, saving an `fsync` of `CURRENT`. Low priority. See `db/db_impl.cc: DBImpl::Recover`. diff --git a/README.md b/README.md index 52e27de..2bb4114 100644 --- a/README.md +++ b/README.md @@ -162,10 +162,15 @@ RoughDB is in active development. The on-disk format is LevelDB-compatible. - `get_property` — `leveldb.num-files-at-level`, `leveldb.stats`, `leveldb.sstables`, `leveldb.approximate-memory-usage` - `get_approximate_sizes` — byte-range estimation via index-block seeks +- `repair` — recovers a database from a corrupt or missing MANIFEST by scanning surviving SSTables + and WAL files, converting WALs to SSTables, and writing a fresh MANIFEST - `destroy` — safely removes a database directory - `LOCK` file — prevents concurrent opens by multiple processes - Table cache — LRU open-file-handle cache bounded by `Options::max_open_files` - Block cache — LRU byte-capacity cache with per-table IDs; `ReadOptions::fill_cache` +- `ForwardIter` — stdlib `Iterator` adapter via `DbIter::forward()` for ergonomic forward scans +- Info logging via the [`log`](https://crates.io/crates/log) crate — compaction progress, flush + lifecycle, recovery details, backpressure events, and errors **Known limitations:** @@ -180,9 +185,6 @@ RoughDB is in active development. The on-disk format is LevelDB-compatible. RoughDB. - **`Env` abstraction** — file I/O is hardcoded to the local POSIX filesystem. There is no way to inject a custom storage backend (in-memory, encrypted, cloud, etc.). -- **`RepairDB`** — recovery from a corrupt or partial MANIFEST by scanning surviving SSTables. -- **Info logging** — LevelDB writes compaction progress, recovery details, and errors to an - `info_log`. RoughDB produces no log output. ## License diff --git a/src/db/version_set.rs b/src/db/version_set.rs index 6490335..06334c0 100644 --- a/src/db/version_set.rs +++ b/src/db/version_set.rs @@ -27,7 +27,7 @@ pub(crate) fn manifest_filename(number: u64) -> String { format!("MANIFEST-{number:06}") } -fn write_current_file(path: &Path, manifest_number: u64) -> Result<(), Error> { +pub(crate) fn write_current_file(path: &Path, manifest_number: u64) -> Result<(), Error> { let content = format!("{}\n", manifest_filename(manifest_number)); std::fs::write(path.join("CURRENT"), content)?; Ok(()) diff --git a/src/lib.rs b/src/lib.rs index b8d0bfc..ea19a3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1428,6 +1428,227 @@ impl Db { Ok(()) } + + /// Attempt to recover a database that cannot be opened normally. + /// + /// Scans the database directory for surviving `.ldb` (SSTable) and `.log` (WAL) files, + /// converts WAL files to SSTables, extracts metadata from all SSTables, and writes a fresh + /// `MANIFEST-000001` + `CURRENT` that places every recovered file at L0. + /// + /// Some data may be lost (corrupt WAL records and unreadable SSTables are skipped). + /// Processed WAL and corrupt files are moved to a `lost/` subdirectory. + /// + /// Does **not** acquire the database lock; the caller must ensure no other process has the + /// database open. Call [`Db::open`] after repair to use the recovered database. + /// + /// See `db/repair.cc: RepairDB`. + pub fn repair>(path: P, options: Options) -> Result<(), Error> { + use crate::iter::InternalIterator; + use crate::table::format::parse_internal_key; + + let path = path.as_ref(); + + // ── Phase 1: find_files ──────────────────────────────────────────────────── + let entries = std::fs::read_dir(path).map_err(|e| { + Error::IoError(std::io::Error::new( + e.kind(), + format!("repair: cannot read directory {}: {e}", path.display()), + )) + })?; + + let mut logs: Vec = Vec::new(); + let mut table_numbers: Vec = Vec::new(); + let mut manifests: Vec = Vec::new(); + let mut max_number: u64 = 0; + + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if let Some((number, kind)) = parse_db_filename(&name) { + if number > max_number { + max_number = number; + } + match kind { + FileKind::Log => logs.push(number), + FileKind::Table => table_numbers.push(number), + FileKind::Manifest => manifests.push(name.to_string()), + _ => {} + } + } + } + let mut next_file_number = max_number.max(1) + 1; + + // ── Phase 2: convert_log_to_table ────────────────────────────────────────── + for log_num in &logs { + let log_path = path.join(format!("{log_num:06}.log")); + let file = match std::fs::File::open(&log_path) { + Ok(f) => f, + Err(e) => { + log::warn!("repair: cannot open WAL {}: {e}", log_path.display()); + continue; + } + }; + let mut reader = LogReader::new(file, None, false, 0); + let mem = Memtable::default(); + + while let Some(record) = reader.read_record() { + let batch = match WriteBatch::from_contents(record) { + Ok(b) => b, + Err(e) => { + log::warn!( + "repair: skipping corrupt WAL record in {}: {e}", + log_path.display() + ); + continue; + } + }; + let mut inserter = Inserter { + mem: &mem, + seq: batch.sequence(), + }; + if let Err(e) = batch.iterate(&mut inserter) { + log::warn!( + "repair: skipping WAL record in {} (iterate failed): {e}", + log_path.display() + ); + continue; + } + } + + if mem.approximate_memory_usage() > 0 { + let sst_number = next_file_number; + next_file_number += 1; + let sst_path = path.join(format!("{sst_number:06}.ldb")); + let sst_file = std::fs::File::create(&sst_path)?; + let mut builder = TableBuilder::new( + sst_file, + options.block_size, + options.block_restart_interval, + options.filter_policy.clone(), + options.compression, + ); + let mut it = mem.iter(); + it.seek_to_first(); + while it.valid() { + builder.add(it.key(), it.value())?; + it.advance(); + } + builder.finish()?; + table_numbers.push(sst_number); + log::info!("repair: converted WAL {log_num:06}.log → SSTable {sst_number:06}.ldb"); + } + + archive_file(path, &format!("{log_num:06}.log")); + } + + // ── Phase 3: scan_table (extract metadata) ──────────────────────────────── + struct TableInfo { + meta: Arc, + max_sequence: u64, + } + + let mut tables: Vec = Vec::new(); + + for &num in &table_numbers { + let sst_path = path.join(format!("{num:06}.ldb")); + let file_size = match std::fs::metadata(&sst_path) { + Ok(m) => m.len(), + Err(e) => { + log::warn!("repair: cannot stat {}: {e}", sst_path.display()); + continue; + } + }; + let file = match std::fs::File::open(&sst_path) { + Ok(f) => f, + Err(e) => { + log::warn!("repair: cannot open {}: {e}", sst_path.display()); + continue; + } + }; + let table = match Table::open(file, file_size, options.filter_policy.clone(), None) { + Ok(t) => t, + Err(e) => { + log::warn!("repair: cannot open SSTable {}: {e}", sst_path.display()); + continue; + } + }; + let mut iter = match table.new_iterator(options.paranoid_checks, false) { + Ok(it) => it, + Err(e) => { + log::warn!( + "repair: cannot create iterator for {}: {e}", + sst_path.display() + ); + continue; + } + }; + + let mut smallest: Vec = Vec::new(); + let mut largest: Vec = Vec::new(); + let mut max_seq: u64 = 0; + + iter.seek_to_first(); + while iter.valid() { + let ikey = iter.key(); + if smallest.is_empty() { + smallest = ikey.to_vec(); + } + largest = ikey.to_vec(); + if let Some((_user_key, seq, _vtype)) = parse_internal_key(ikey) { + if seq > max_seq { + max_seq = seq; + } + } + iter.next(); + } + + if let Some(e) = iter.status() { + log::warn!( + "repair: iterator error scanning {}: {e}", + sst_path.display() + ); + continue; + } + + if !smallest.is_empty() { + tables.push(TableInfo { + meta: FileMetaData::new(num, file_size, smallest, largest), + max_sequence: max_seq, + }); + } + } + + // ── Phase 4: write_descriptor ────────────────────────────────────────────── + let max_sequence = tables.iter().map(|t| t.max_sequence).max().unwrap_or(0); + + let mut edit = VersionEdit::new(); + edit.log_number = Some(0); + edit.next_file_number = Some(next_file_number); + edit.last_sequence = Some(max_sequence); + for t in &tables { + edit.new_files.push((0, Arc::clone(&t.meta))); + } + + let manifest_path = path.join(crate::db::version_set::manifest_filename(1)); + let manifest_file = std::fs::File::create(&manifest_path)?; + let mut manifest_writer = LogWriter::new(manifest_file, 0); + manifest_writer.add_record(&edit.encode())?; + + // Delete old manifests. + for name in &manifests { + let _ = std::fs::remove_file(path.join(name)); + } + + // Write CURRENT pointing at MANIFEST-000001. + crate::db::version_set::write_current_file(path, 1)?; + + log::info!( + "repair: wrote new manifest with {} tables, max_sequence={max_sequence}", + tables.len() + ); + + Ok(()) + } } // ── Background scheduling helpers ──────────────────────────────────────────── @@ -2861,6 +3082,16 @@ fn parse_db_filename(name: &str) -> Option<(u64, FileKind)> { None } +/// Move a file from the database directory into a `lost/` subdirectory. +/// +/// Used by [`Db::repair`] to archive processed WAL files and corrupt SSTables +/// so they are not re-processed on a subsequent repair attempt. +fn archive_file(db_path: &std::path::Path, filename: &str) { + let lost_dir = db_path.join("lost"); + let _ = std::fs::create_dir_all(&lost_dir); + let _ = std::fs::rename(db_path.join(filename), lost_dir.join(filename)); +} + /// Delete database files that are no longer referenced by any live `Version`. /// /// Matches LevelDB's `DBImpl::RemoveObsoleteFiles`: @@ -5383,4 +5614,76 @@ mod tests { let second: Vec<_> = it.forward().map(|r| r.unwrap().0).collect(); assert_eq!(second, vec![b"b".to_vec()]); } + + // ── Db::repair tests ─────────────────────────────────────────────────────── + + #[serial(fd)] + #[test] + fn repair_recovers_flushed_data() { + let dir = tempfile::tempdir().unwrap(); + // Write enough data to trigger a flush (small write_buffer_size). + { + let mut opts = create_options(); + opts.write_buffer_size = 512; + let db = Db::open(dir.path(), opts).unwrap(); + for i in 0..20u32 { + db.put( + format!("key{i:03}").as_bytes(), + format!("val{i:03}").as_bytes(), + ) + .unwrap(); + } + } + // Delete MANIFEST and CURRENT to simulate corruption. + for entry in std::fs::read_dir(dir.path()).unwrap() { + let entry = entry.unwrap(); + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with("MANIFEST-") || name == "CURRENT" { + std::fs::remove_file(entry.path()).unwrap(); + } + } + // Repair should succeed. + Db::repair(dir.path(), create_options()).unwrap(); + // Reopen and verify data. + let db = Db::open(dir.path(), Options::default()).unwrap(); + // At least some keys should be recoverable from SSTables. + let mut found = 0; + for i in 0..20u32 { + if db.get(format!("key{i:03}").as_bytes()).is_ok() { + found += 1; + } + } + assert!(found > 0, "expected at least some keys to survive repair"); + } + + #[serial(fd)] + #[test] + fn repair_converts_wal_to_table() { + let dir = tempfile::tempdir().unwrap(); + // Write data but don't flush (large write_buffer_size). + { + let db = Db::open(dir.path(), create_options()).unwrap(); + db.put(b"from_wal", b"yes").unwrap(); + } + // Delete MANIFEST and CURRENT. + for entry in std::fs::read_dir(dir.path()).unwrap() { + let entry = entry.unwrap(); + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with("MANIFEST-") || name == "CURRENT" { + std::fs::remove_file(entry.path()).unwrap(); + } + } + Db::repair(dir.path(), create_options()).unwrap(); + let db = Db::open(dir.path(), Options::default()).unwrap(); + assert_eq!(db.get(b"from_wal").unwrap(), b"yes"); + } + + #[test] + fn repair_nonexistent_path_errors() { + let err = Db::repair( + "/tmp/roughdb_nonexistent_repair_test_xyz", + Options::default(), + ); + assert!(err.is_err()); + } }