Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ cargo add roughdb
```rust
use roughdb::{Db, Options};

let mut opts = Options::default();
opts.create_if_missing = true;
let opts = roughdb::Options {
create_if_missing: true,
..Default::default()
};

let db = Db::open("/tmp/my_db", opts)?;
```
Expand Down
156 changes: 148 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
//! batch.put(b"key2", b"val2");
//! batch.delete(b"old_key");
//!
//! db.write(&WriteOptions::default(), &batch)?;
//! db.write(&WriteOptions::default(), batch)?;
//! # Ok::<(), roughdb::Error>(())
//! ```
//!
Expand Down Expand Up @@ -414,6 +414,70 @@ impl DbIter {
pub fn status(&self) -> Option<&Error> {
self.inner.status()
}

/// Return a standard [`Iterator`] that yields `(key, value)` pairs by advancing forward from
/// the current position.
///
/// Position the `DbIter` first (via [`seek_to_first`], [`seek`], etc.), then call `forward()`
/// to obtain an adapter that implements Rust's [`Iterator`] trait. Each call to
/// [`Iterator::next`] reads the current key and value, advances the cursor, and returns owned
/// copies.
///
/// The adapter yields `Some(Err(..))` exactly once if the underlying iterator encounters
/// corruption, then `None` on all subsequent calls.
///
/// [`seek_to_first`]: DbIter::seek_to_first
/// [`seek`]: DbIter::seek
///
/// # Examples
///
/// ```
/// # let db = roughdb::Db::default();
/// # db.put(b"a", b"1").unwrap();
/// use roughdb::ReadOptions;
///
/// let mut it = db.new_iterator(&ReadOptions::default())?;
/// it.seek_to_first();
///
/// for result in it.forward() {
/// let (key, value) = result?;
/// println!("{:?} = {:?}", key, value);
/// }
/// # Ok::<(), roughdb::Error>(())
/// ```
pub fn forward(&mut self) -> ForwardIter<'_> {
ForwardIter {
inner: self,
done: false,
}
}
}

/// Adapter returned by [`DbIter::forward`] that implements [`Iterator`].
///
/// Yields `Ok((key, value))` pairs as owned `Vec<u8>`s, advancing forward from
/// wherever the parent `DbIter` was positioned when `forward()` was called.
pub struct ForwardIter<'a> {
inner: &'a mut DbIter,
done: bool,
}

impl Iterator for ForwardIter<'_> {
type Item = Result<(Vec<u8>, Vec<u8>), Error>;

fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
if !self.inner.valid() {
self.done = true;
return self.inner.status().map(|e| Err(e.clone()));
}
let key = self.inner.key().to_vec();
let value = self.inner.value().to_vec();
self.inner.next();
Some(Ok((key, value)))
}
}

// ── LOCK file ────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -918,7 +982,7 @@ impl Db {
{
let mut batch = WriteBatch::new();
batch.put(key.as_ref(), value.as_ref());
self.write(&WriteOptions::default(), &batch)
self.write(&WriteOptions::default(), batch)
}

pub fn delete<K>(&self, key: K) -> Result<(), Error>
Expand All @@ -927,7 +991,7 @@ impl Db {
{
let mut batch = WriteBatch::new();
batch.delete(key.as_ref());
self.write(&WriteOptions::default(), &batch)
self.write(&WriteOptions::default(), batch)
}

/// Create a forward iterator over a consistent snapshot of the database.
Expand Down Expand Up @@ -1194,7 +1258,7 @@ impl Db {
/// the OS page cache provides durability — data survives crashes of the process but not the OS.
///
/// See `db/db_impl.cc: DBImpl::Write`.
pub fn write(&self, opts: &WriteOptions, batch: &WriteBatch) -> Result<(), Error> {
pub fn write(&self, opts: &WriteOptions, batch: WriteBatch) -> Result<(), Error> {
// ── Phase 1: Enqueue this write request ──────────────────────────────────
//
// Every caller pushes a `WriterSlot` and waits until it is either at the
Expand All @@ -1206,7 +1270,7 @@ impl Db {
state.next_writer_id += 1;
state.writers.push_back(WriterSlot {
id,
batch: batch.clone(),
batch,
sync: opts.sync,
});
id
Expand Down Expand Up @@ -1337,9 +1401,14 @@ impl Db {
// If we triggered a flush, wait for the background thread to complete it.
// This preserves the original synchronous-flush behaviour seen by callers.
if triggered_flush {
while state.imm.is_some() || state.pending_flush.is_some() {
while (state.imm.is_some() || state.pending_flush.is_some())
&& state.background_error.is_none()
{
state = self.inner.write_condvar.wait(state).unwrap();
}
// A background error means the flush failed. The write itself already
// succeeded (WAL + memtable), so we don't propagate the error here —
// the next write will see it in make_room_for_write.
}
}
drop(state);
Expand Down Expand Up @@ -3023,7 +3092,7 @@ mod tests {
batch.put("a", "1");
batch.put("b", "2");
batch.put("c", "3");
db.write(&WriteOptions::default(), &batch).unwrap();
db.write(&WriteOptions::default(), batch).unwrap();
}
let db = Db::open(dir.path(), Options::default()).unwrap();
assert_eq!(db.get("a").unwrap(), b"1");
Expand Down Expand Up @@ -4339,7 +4408,7 @@ mod tests {
let key = format!("skey{i}");
let mut batch = WriteBatch::new();
batch.put(key.as_bytes(), b"v");
db.write(&opts, &batch).unwrap();
db.write(&opts, batch).unwrap();
})
})
.collect();
Expand Down Expand Up @@ -5176,4 +5245,75 @@ mod tests {
// Data must be readable regardless of whether the flush has completed yet.
assert_eq!(db.get(b"k0").unwrap(), b"v");
}

// ── ForwardIter tests ────────────────────────────────────────────────────

#[test]
fn forward_iter_full_scan() {
let db = Db::default();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();
db.put(b"c", b"3").unwrap();

let mut it = db.new_iterator(&ReadOptions::default()).unwrap();
it.seek_to_first();
let pairs: Vec<_> = it.forward().map(|r| r.unwrap()).collect();
assert_eq!(
pairs,
vec![
(b"a".to_vec(), b"1".to_vec()),
(b"b".to_vec(), b"2".to_vec()),
(b"c".to_vec(), b"3".to_vec()),
]
);
}

#[test]
fn forward_iter_from_seek() {
let db = Db::default();
for c in b'a'..=b'e' {
db.put(&[c], &[c]).unwrap();
}

let mut it = db.new_iterator(&ReadOptions::default()).unwrap();
it.seek(b"c");
let keys: Vec<_> = it.forward().map(|r| r.unwrap().0).collect();
assert_eq!(keys, vec![b"c".to_vec(), b"d".to_vec(), b"e".to_vec()]);
}

#[test]
fn forward_iter_empty() {
let db = Db::default();
let mut it = db.new_iterator(&ReadOptions::default()).unwrap();
it.seek_to_first();
assert_eq!(it.forward().count(), 0);
}

#[test]
fn forward_iter_unpositioned_yields_nothing() {
let db = Db::default();
db.put(b"k", b"v").unwrap();
let mut it = db.new_iterator(&ReadOptions::default()).unwrap();
// No seek — iterator is unpositioned, valid() is false.
assert_eq!(it.forward().count(), 0);
}

#[test]
fn forward_iter_reusable_after_drop() {
let db = Db::default();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();

let mut it = db.new_iterator(&ReadOptions::default()).unwrap();

// First pass: scan from "a".
it.seek_to_first();
let first: Vec<_> = it.forward().map(|r| r.unwrap().0).collect();
assert_eq!(first, vec![b"a".to_vec(), b"b".to_vec()]);

// ForwardIter dropped — reposition and scan again.
it.seek(b"b");
let second: Vec<_> = it.forward().map(|r| r.unwrap().0).collect();
assert_eq!(second, vec![b"b".to_vec()]);
}
}
2 changes: 1 addition & 1 deletion src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ mod tests {
batch.put(b"foo", b"bar");
batch.put(b"baz", b"qux");
batch.delete(b"foo");
db.write(&WriteOptions::default(), &batch).unwrap();
db.write(&WriteOptions::default(), batch).unwrap();
assert!(db
.get_with_options(&ReadOptions::default(), b"foo")
.unwrap_err()
Expand Down
Loading