Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["database", "levelDB", "persistence"]
bumpalo = "3.20.2"
crc32c = "0.6.8"
libc = "0.2"
log = "0.4"
snap = "1"
zstd = "0.13"

Expand Down
4 changes: 2 additions & 2 deletions src/db/version_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::db::table_cache::TableCache;
use crate::db::version::Version;
use crate::db::version_edit::{FileMetaData, VersionEdit};
use crate::error::Error;
use crate::log::reader::Reader as LogReader;
use crate::log::writer::Writer as LogWriter;
use crate::logfile::reader::Reader as LogReader;
use crate::logfile::writer::Writer as LogWriter;
use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::path::Path;
Expand Down
85 changes: 76 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
use crate::db::table_cache::TableCache;
use crate::db::version_edit::{FileMetaData, VersionEdit};
use crate::db::version_set::VersionSet;
use crate::log::reader::Reader as LogReader;
use crate::log::writer::Writer as LogWriter;
use crate::logfile::reader::Reader as LogReader;
use crate::logfile::writer::Writer as LogWriter;
use crate::memtable::{ArcMemTableIter, Memtable, MemtableResult};
use crate::table::builder::TableBuilder;
use crate::table::reader::{LookupResult, Table};
Expand All @@ -132,7 +132,7 @@ pub use options::{CompressionType, FlushOptions, Options, WriteOptions};
pub(crate) mod coding;
pub(crate) mod db;
pub(crate) mod iter;
pub(crate) mod log;
pub(crate) mod logfile;
pub(crate) mod memtable;
pub(crate) mod table;
pub mod write_batch;
Expand Down Expand Up @@ -565,6 +565,7 @@ impl Default for Db {
impl Drop for Db {
fn drop(&mut self) {
if self.inner.persistence.is_some() {
log::info!("shutting down database");
self.inner.shutting_down.store(true, Ordering::Release);
self.inner.bg_condvar.notify_one();
if let Some(t) = self.bg_thread.take() {
Expand Down Expand Up @@ -643,6 +644,7 @@ impl Db {

let (version_set, mem, last_sequence) = if db_exists {
// ── Existing database: MANIFEST-driven recovery ──────────────────────
log::info!("opening existing database at {}", path.display());
let mut vs = VersionSet::recover(path, options.paranoid_checks)?;
let manifest_last_seq = vs.last_sequence();
let mem = Arc::new(Memtable::default());
Expand All @@ -654,7 +656,14 @@ impl Db {
let file = std::fs::File::open(&log_path)?;
let file_len = file.metadata()?.len();
if file_len > 0 {
Self::recover_wal(file, &mem, manifest_last_seq, options.paranoid_checks)?
log::info!(
"replaying WAL {:06}.log (manifest_last_seq={})",
vs.log_number(),
manifest_last_seq
);
let seq = Self::recover_wal(file, &mem, manifest_last_seq, options.paranoid_checks)?;
log::info!("WAL replay complete: max_sequence={seq}");
seq
} else {
manifest_last_seq
}
Expand All @@ -669,6 +678,7 @@ impl Db {
(Some(vs), mem, last_seq)
} else {
// ── New database: create MANIFEST and WAL ────────────────────────────
log::info!("creating new database at {}", path.display());
let vs = VersionSet::create(path)?;
// Initial WAL is always 000001.log (log_number = 1 from VersionSet::create).
std::fs::File::create(path.join("000001.log"))?;
Expand Down Expand Up @@ -1392,7 +1402,10 @@ impl Db {
state.pending_flush = Some(prep);
true
}
Err(_) => false,
Err(e) => {
log::warn!("begin_flush after write failed: {e}, flush deferred");
false
}
}
} else {
false
Expand Down Expand Up @@ -1466,6 +1479,7 @@ fn make_room_for_write<'a>(
.map_or(0, |vs| vs.current().files[0].len());
if allow_delay && l0 >= L0_SLOWDOWN_WRITES_TRIGGER {
// Slow down at most once per write call.
log::debug!("L0 file count ({l0}) ≥ {L0_SLOWDOWN_WRITES_TRIGGER}: delaying writes 1ms");
allow_delay = false;
drop(g);
std::thread::sleep(std::time::Duration::from_millis(1));
Expand All @@ -1474,12 +1488,21 @@ fn make_room_for_write<'a>(
break; // There is room in the current memtable.
} else if g.imm.is_some() || g.pending_flush.is_some() {
// A flush is already in progress; wait for the background thread.
log::debug!("waiting for in-progress flush to complete");
g = inner.write_condvar.wait(g).unwrap();
} else if l0 >= L0_STOP_WRITES_TRIGGER {
// Too many L0 files; wait for the background thread to drain them.
log::warn!(
"L0 file count ({l0}) ≥ {L0_STOP_WRITES_TRIGGER}: stopping writes until compaction drains L0"
);
g = inner.write_condvar.wait(g).unwrap();
} else {
// Rotate mem → imm, schedule background flush.
log::info!(
"memtable full ({} bytes ≥ {}): rotating to immutable",
g.mem.approximate_memory_usage(),
inner.options.write_buffer_size,
);
let path = inner.persistence.as_ref().unwrap().dir.as_path();
let prep = begin_flush(path, &mut g)?;
g.imm = Some(Arc::clone(&prep.old_mem));
Expand Down Expand Up @@ -1524,16 +1547,19 @@ fn bg_worker(inner: Arc<DbInner>) {

// ── Flush if pending ──────────────────────────────────────────────────────
if let Some(prep) = g.pending_flush.take() {
log::info!("bg: flushing memtable to SSTable {}", prep.sst_number);
drop(g);
let result = write_flush(prep, &inner.options);
g = inner.state.lock().unwrap();
match result {
Ok(res) => {
if let Err(e) = finish_flush(&mut g, res, &inner.options, &tc) {
log::error!("bg: finish_flush failed: {e}, stopping writes");
g.background_error = Some(e);
}
}
Err(e) => {
log::error!("bg: write_flush failed: {e}, stopping writes");
g.background_error = Some(e);
}
}
Expand Down Expand Up @@ -1604,6 +1630,7 @@ fn begin_flush(path: &std::path::Path, state: &mut DbState) -> Result<FlushPrep,
let old_log_number = vs.log_number();
let sst_number = vs.next_file_number();
let new_log_number = vs.next_file_number();
log::debug!("begin_flush: sst={sst_number}, new_log={new_log_number}, old_log={old_log_number}");
let new_log_file = std::fs::File::create(path.join(format!("{new_log_number:06}.log")))?;
let new_log = LogWriter::new(new_log_file, 0);
// Activate the new WAL immediately; preserve the old WAL so finish_flush
Expand Down Expand Up @@ -1742,6 +1769,11 @@ fn finish_flush(
vs.log_and_apply(&mut edit, tc)?;
// state.log was already swapped to the new WAL in begin_flush — no swap needed here.
state.imm = None;
log::info!(
"flush complete: file {} ({} bytes) at L{output_level}",
result.file_number,
result.file_size,
);
// Best-effort delete — ignore errors (e.g. the path never existed on new DB).
let _ = std::fs::remove_file(&result.old_log_path);
Ok(())
Expand Down Expand Up @@ -2318,6 +2350,15 @@ fn do_compaction(
use crate::iter::InternalIterator;

let _output_level = spec.level + 1;
log::info!(
"compaction L{}→L{}: {} + {} files ({} + {} bytes)",
spec.level,
spec.level + 1,
spec.inputs[0].len(),
spec.inputs[1].len(),
spec.inputs[0].iter().map(|f| f.file_size).sum::<u64>(),
spec.inputs[1].iter().map(|f| f.file_size).sum::<u64>(),
);

let mut children: Vec<Box<dyn InternalIterator>> = Vec::new();
for meta in spec.all_input_files() {
Expand Down Expand Up @@ -2441,6 +2482,13 @@ fn do_compaction(
)?;
}

log::info!(
"compaction L{}→L{} complete: {} output files ({} bytes)",
spec.level,
spec.level + 1,
outputs.len(),
outputs.iter().map(|o| o.file_size).sum::<u64>(),
);
Ok(outputs)
}

Expand Down Expand Up @@ -2732,8 +2780,18 @@ fn maybe_compact(
// Trivial move: single file, no L+1 overlap, acceptable grandparent overlap.
// No I/O needed — just a MANIFEST update.
if is_trivial_move(&spec, opts) {
let file = &spec.inputs[0][0];
log::info!(
"trivial move: file {} ({} bytes) L{}→L{}",
file.number,
file.file_size,
spec.level,
spec.level + 1,
);
let mut g = state.lock().unwrap();
let _ = install_trivial_move(&mut g, &spec, tc);
if let Err(e) = install_trivial_move(&mut g, &spec, tc) {
log::warn!("trivial move failed: {e}");
}
drop(g);
delete_obsolete_files(path, state);
return;
Expand All @@ -2742,13 +2800,21 @@ fn maybe_compact(
// Phase 2: I/O (no lock).
let outputs = match do_compaction(path, state, &mut spec, oldest_snapshot, opts, tc) {
Ok(o) => o,
Err(_) => return,
Err(e) => {
log::warn!("compaction L{}→L{} failed: {e}", spec.level, spec.level + 1);
return;
}
};

// Phase 3: install.
{
let mut g = state.lock().unwrap();
if install_compaction(&mut g, &spec, outputs, tc).is_err() {
if let Err(e) = install_compaction(&mut g, &spec, outputs, tc) {
log::warn!(
"install_compaction L{}→L{} failed: {e}",
spec.level,
spec.level + 1
);
return;
}
}
Expand Down Expand Up @@ -2835,6 +2901,7 @@ fn delete_obsolete_files(path: &std::path::Path, state: &Mutex<DbState>) {
FileKind::Current | FileKind::Lock => true,
};
if !keep {
log::debug!("deleting obsolete file: {name}");
let _ = std::fs::remove_file(entry.path());
}
}
Expand Down Expand Up @@ -5272,7 +5339,7 @@ mod tests {
fn forward_iter_from_seek() {
let db = Db::default();
for c in b'a'..=b'e' {
db.put(&[c], &[c]).unwrap();
db.put([c], [c]).unwrap();
}

let mut it = db.new_iterator(&ReadOptions::default()).unwrap();
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions src/log/reader.rs → src/logfile/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// limitations under the License.

use crate::coding::{crc32c, crc32c_extend, unmask_crc};
use crate::log::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use crate::logfile::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};

Expand Down Expand Up @@ -345,7 +345,7 @@ impl Reader {
#[cfg(test)]
mod tests {
use super::*;
use crate::log::writer::Writer;
use crate::logfile::writer::Writer;
use std::io::{Seek, SeekFrom, Write};

struct CorruptionLog(Vec<(u64, String)>);
Expand Down
4 changes: 2 additions & 2 deletions src/log/writer.rs → src/logfile/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use crate::coding::{crc32c, crc32c_extend, mask_crc};
use crate::error::Error;
use crate::log::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use crate::logfile::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use std::fs::File;
use std::io::{BufWriter, Write};

Expand Down Expand Up @@ -121,7 +121,7 @@ impl Writer {
#[cfg(test)]
mod tests {
use super::*;
use crate::log::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use crate::logfile::format::{RecordType, BLOCK_SIZE, HEADER_SIZE};
use std::io::{Read, Seek, SeekFrom};

/// Write records to a temp file via Writer, then read back the raw bytes for
Expand Down
Loading