Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchtop/src/nomt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl NomtDB {
} else {
session_params.overlay(overlay_window.iter()).unwrap()
};
let session = self.nomt.begin_session(session_params);
let session = self.nomt.begin_session(session_params).unwrap();

let mut transaction = Tx {
session: &session,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl NomtDB {
} else {
session_params.overlay(overlay_window.iter()).unwrap()
};
let session = self.nomt.begin_session(session_params);
let session = self.nomt.begin_session(session_params).unwrap();
let mut results: Vec<Option<_>> = (0..workloads.len()).map(|_| None).collect();

let use_timer = timer.is_some();
Expand Down
2 changes: 1 addition & 1 deletion examples/commit_batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl NomtDB {
// Writes do not occur immediately, instead,
// they are cached and applied all at once later on
let session =
nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()));
nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))?;

// Here we will move the data saved under b"key1" to b"key2" and deletes it
//
Expand Down
2 changes: 1 addition & 1 deletion examples/read_value/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() -> Result<()> {
// Instantiate a new Session object to handle read and write operations
// and generate a Witness later on
let session =
nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()));
nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))?;

// Reading a key from the database
let key_path = sha2::Sha256::digest(b"key").into();
Expand Down
6 changes: 3 additions & 3 deletions fuzz/fuzz_targets/api_surface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ fuzz_target!(|run: Run| {
for call in run.calls.calls {
match call {
NomtCall::BeginSession { session_calls } => {
let session = db.begin_session(
SessionParams::default().witness_mode(WitnessMode::read_write()),
);
let session = db
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))
.unwrap();
for session_call in session_calls {
match session_call {
SessionCall::TentativeRead {
Expand Down
9 changes: 5 additions & 4 deletions nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,9 @@ impl<T: HashAlgorithm> Nomt<T> {
/// prevent writes to the database. Sessions are the main way of reading to the database,
/// and permit a changeset to be committed either directly to the database or into an
/// in-memory [`Overlay`].
pub fn begin_session(&self, params: SessionParams) -> Session<T> {
pub fn begin_session(&self, params: SessionParams) -> anyhow::Result<Session<T>> {
let live_overlay = params.overlay;
live_overlay.ensure_base_root(self.root())?;

// We must take the access guard before instantiating the rollback delta,
// because it creates a read transaction and any commits or rollbacks will block
Expand All @@ -326,7 +327,7 @@ impl<T: HashAlgorithm> Nomt<T> {
.parent_root()
.unwrap_or_else(|| self.root().into_inner());

Session {
Ok(Session {
store,
merkle_updater: self.merkle_update_pool.begin::<T>(
self.page_cache.clone(),
Expand All @@ -342,7 +343,7 @@ impl<T: HashAlgorithm> Nomt<T> {
access_guard,
prev_root: Root(prev_root),
_marker: std::marker::PhantomData,
}
})
}

/// Perform a rollback of the last `n` commits.
Expand Down Expand Up @@ -373,7 +374,7 @@ impl<T: HashAlgorithm> Nomt<T> {

// We hold a write guard and don't need the session to take any other.
session_params.take_global_guard = false;
let sess = self.begin_session(session_params);
let sess = self.begin_session(session_params)?;

// Convert the traceback into a series of write commands.
let mut actuals = Vec::new();
Expand Down
46 changes: 31 additions & 15 deletions nomt/src/merkle/seek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ impl SeekRequest {

fn continue_leaf_fetch<H: HashAlgorithm>(&mut self, leaf: Option<LeafNodeRef>) {
let RequestState::FetchingLeaf {
ref mut overlay_deletions,
ref mut beatree_iterator,
..
} = self.state
Expand All @@ -203,13 +204,24 @@ impl SeekRequest {
beatree_iterator.provide_leaf(leaf);
}

let (key, value_hash) = match beatree_iterator.next() {
None => panic!("leaf must exist position={}", self.position.path()),
Some(IterOutput::Blocked) => return,
Some(IterOutput::Item(key, value)) => {
(key, H::hash_value(&value)) // hash
let mut deletions_idx = 0;
let (key, value_hash) = loop {
match beatree_iterator.next() {
None => panic!("leaf must exist position={}", self.position.path()),
Some(IterOutput::Blocked) => {
overlay_deletions.drain(..deletions_idx);
return;
}
Some(IterOutput::Item(key, _value))
if deletions_idx < overlay_deletions.len()
&& overlay_deletions[deletions_idx] == key =>
{
deletions_idx += 1;
continue;
}
Some(IterOutput::Item(key, value)) => break (key, H::hash_value(&value)),
Some(IterOutput::OverflowItem(key, value_hash, _)) => break (key, value_hash),
}
Some(IterOutput::OverflowItem(key, value_hash, _)) => (key, value_hash),
};

self.state = RequestState::Completed(Some(trie::LeafData {
Expand Down Expand Up @@ -351,6 +363,7 @@ enum RequestState {
Seeking,
// Fetching one leaf
FetchingLeaf {
overlay_deletions: Vec<KeyPath>,
beatree_iterator: BeatreeIterator,
needed_leaves: NeededLeavesIter,
},
Expand All @@ -373,16 +386,18 @@ impl RequestState {
) -> Self {
let (start, end) = range_bounds(pos.raw_path(), pos.depth() as usize);

// First see if the item is present within the overlay.
let overlay_item = overlay
.value_iter(start, end)
.filter(|(_, v)| v.as_option().is_some())
.next();
let overlay_items = overlay.value_iter(start, end);
let mut overlay_deletions = vec![];

if let Some((key_path, overlay_leaf)) = overlay_item {
let value_hash = match overlay_leaf {
// PANIC: we filtered out all deletions above.
ValueChange::Delete => panic!(),
for (key_path, overlay_change) in overlay_items {
let value_hash = match overlay_change {
ValueChange::Delete => {
// All deletes must be collected to filter out from the beatree iterator.
overlay_deletions.push(key_path);
continue;
}
// If an insertion is found within the overlay, it is expected to be
// the item associated with the leaf that is being fetched.
ValueChange::Insert(value) => H::hash_value(value),
ValueChange::InsertOverflow(_, value_hash) => value_hash.clone(),
};
Expand All @@ -397,6 +412,7 @@ impl RequestState {
let beatree_iterator = read_transaction.iterator(start, end);
let needed_leaves = beatree_iterator.needed_leaves();
RequestState::FetchingLeaf {
overlay_deletions,
beatree_iterator,
needed_leaves,
}
Expand Down
18 changes: 17 additions & 1 deletion nomt/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ pub enum InvalidAncestors {
#[derive(Clone)]
pub(super) struct LiveOverlay {
parent: Option<Arc<OverlayInner>>,
base_root: Option<Root>,
ancestor_data: Vec<Arc<Data>>,
min_seqn: u64,
}
Expand All @@ -250,12 +251,14 @@ impl LiveOverlay {
let Some(parent) = live_ancestors.next().map(|p| p.inner.clone()) else {
return Ok(LiveOverlay {
parent: None,
base_root: None,
ancestor_data: Vec::new(),
min_seqn: 0,
});
};

let mut ancestor_data = Vec::new();
let mut base_root = Some(Root(parent.prev_root));
for (supposed_ancestor, actual_ancestor) in live_ancestors.zip(parent.ancestor_data.iter())
{
let Some(actual_ancestor) = actual_ancestor.upgrade() else {
Expand All @@ -266,7 +269,8 @@ impl LiveOverlay {
return Err(InvalidAncestors::NotAncestor);
}

ancestor_data.push(actual_ancestor);
base_root = Some(supposed_ancestor.prev_root());
ancestor_data.push(actual_ancestor.clone());
}

// verify that the chain is complete. The last ancestor's parent must either be `None` or
Expand All @@ -286,6 +290,7 @@ impl LiveOverlay {
Ok(LiveOverlay {
parent: Some(parent),
ancestor_data,
base_root,
min_seqn,
})
}
Expand Down Expand Up @@ -418,6 +423,17 @@ impl LiveOverlay {
pub(super) fn parent_root(&self) -> Option<Node> {
self.parent.as_ref().map(|p| p.root)
}

/// Ensure that the oldest overlay's previous root matches
/// the specified current state root.
pub fn ensure_base_root(&self, state_root: Root) -> anyhow::Result<()> {
self.base_root
.map_or(true, |base_root| base_root == state_root)
.then(|| ())
.ok_or(anyhow::anyhow!(
"State root and oldest overlay prev root do not match."
))
}
}

#[cfg(test)]
Expand Down
21 changes: 15 additions & 6 deletions nomt/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl Test {
o.hashtable_buckets(hashtable_buckets);
o.commit_concurrency(commit_concurrency);
let nomt = Nomt::open(o).unwrap();
let session =
nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()));
let session = nomt
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))
.unwrap();
Self {
nomt,
session: Some(session),
Expand Down Expand Up @@ -128,7 +129,8 @@ impl Test {
finished.commit(&self.nomt).unwrap();
self.session = Some(
self.nomt
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write())),
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))
.unwrap(),
);
(root, witness)
}
Expand All @@ -142,7 +144,8 @@ impl Test {

self.session = Some(
self.nomt
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write())),
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))
.unwrap(),
);

(finished.into_overlay(), witness)
Expand All @@ -155,10 +158,16 @@ impl Test {
overlay.commit(&self.nomt).unwrap();
self.session = Some(
self.nomt
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write())),
.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write()))
.unwrap(),
);
}

pub fn try_begin_session(&mut self, params: SessionParams) -> anyhow::Result<()> {
self.session = Some(self.nomt.begin_session(params)?);
Ok(())
}

pub fn start_overlay_session<'a>(&mut self, ancestors: impl IntoIterator<Item = &'a Overlay>) {
// force drop of live session before creating a new one.
self.access.clear();
Expand All @@ -167,7 +176,7 @@ impl Test {
.witness_mode(WitnessMode::read_write())
.overlay(ancestors)
.unwrap();
self.session = Some(self.nomt.begin_session(params));
self.session = Some(self.nomt.begin_session(params).unwrap());
}

pub fn prove(&self, key: KeyPath) -> PathProof {
Expand Down
Loading
Loading