diff --git a/CHANGELOG.md b/CHANGELOG.md index f1484dc..96caf33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,22 @@ versioning follows [Semantic Versioning](https://semver.org/). For design background see [ARCHITECTURE.md](ARCHITECTURE.md); fine-grained per-commit history is in `git log`. +## [0.7.2] — 2026-06-18 + +### Fixed + +- Fixed nightly DB/crash soak failures where stale cross-blob routes could reach + a delete-fenced blob. `DB` no longer runs GUID-only background auto-merge; + DB-wide merge stays rooted in live trees through explicit compaction. The + route cache is now restricted to root-child crossings, and walkers restart + from the root when they encounter a delete-fenced child instead of treating it + as `NotFound`. +- `DB::view` now uses the same fenced snapshot capture path as `Tree::view`, + so multi-tree views cannot capture parent/child topology from mixed write + generations. +- Merge eligibility now rejects snapshot-shared child blobs, preventing + maintenance from deleting a blob still referenced by a live snapshot. + ## [0.7.1] — 2026-06-12 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index a889739..6517846 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,7 +213,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "holt" -version = "0.7.1" +version = "0.7.2" dependencies = [ "crc32fast", "crossbeam-channel", diff --git a/Cargo.toml b/Cargo.toml index 1b70e99..6c47057 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "holt" -version = "0.7.1" +version = "0.7.2" edition = "2021" rust-version = "1.82" autobenches = false @@ -8,8 +8,8 @@ description = "An adaptive-radix-tree metadata storage engine for path-shaped ke license = "MIT" authors = ["the holt contributors"] readme = "README.md" -repository = "https://github.com/feichai0017/holt" -homepage = "https://github.com/feichai0017/holt" +repository = "https://github.com/NoKV-Lab/holt" +homepage = "https://github.com/NoKV-Lab/holt" documentation = "https://docs.rs/holt" keywords = ["art", "radix-tree", "metadata", "storage", "embedded"] categories = ["database-implementations", "data-structures", "filesystem"] diff --git a/benches/Cargo.lock b/benches/Cargo.lock index 67a524b..c331ab2 100644 --- a/benches/Cargo.lock +++ b/benches/Cargo.lock @@ -567,7 +567,7 @@ dependencies = [ [[package]] name = "holt" -version = "0.7.1" +version = "0.7.2" dependencies = [ "crc32fast", "crossbeam-channel", diff --git a/src/api/db.rs b/src/api/db.rs index 62fdb23..b58adbd 100644 --- a/src/api/db.rs +++ b/src/api/db.rs @@ -86,7 +86,15 @@ impl std::fmt::Debug for DB { impl DB { /// Open a multi-tree database using the supplied configuration. - pub fn open(cfg: TreeConfig) -> Result { + pub fn open(mut cfg: TreeConfig) -> Result { + // The background merge queue is keyed only by blob GUID. In a + // multi-tree DB, a queued parent may become unreachable from all + // live roots while still sharing children with a live tree or a + // snapshot. DB-wide merge therefore runs through `DB::compact`, + // which walks from live roots; the background checkpointer only + // drains dirty bytes and pending deletes. + cfg.checkpoint.auto_merge = false; + let bm = Tree::open_buffer_manager(&cfg)?; let mut open_stats = OpenStats::default(); @@ -332,7 +340,7 @@ impl DB { .collect::>(); let mut trees = HashMap::with_capacity(scoped.len()); for (_, name, prefix, tree) in scoped { - trees.insert(name, tree.snapshot_unlocked_unfenced(prefix)?); + trees.insert(name, tree.snapshot_unlocked(prefix)?); } DBView { trees } }; diff --git a/src/engine/route_cache.rs b/src/engine/route_cache.rs index 3489f31..302a022 100644 --- a/src/engine/route_cache.rs +++ b/src/engine/route_cache.rs @@ -1,10 +1,10 @@ -//! Small parent-validated route cache for path-shaped metadata keys. +//! Small root-validated route cache for path-shaped metadata keys. //! //! A hit is only a candidate. Callers must pin the cached parent, //! hold its shared latch, verify the parent content version, and -//! then pin the child before using the shortcut. That keeps the -//! cached parent->child edge stable while still allowing deeper -//! prefix anchors than the root-only path. +//! then pin the child before using the shortcut. The cache only keeps +//! root-child crossings: deeper parent edges can remain internally +//! stable even after the parent becomes unreachable from the live root. use std::collections::HashMap; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; @@ -118,6 +118,9 @@ impl RouteCache { for &len in &entries.lengths { if let Some(prefix) = key.user_prefix(len) { if let Some(entry) = entries.map.get(prefix) { + if entry.parent_depth != 0 { + continue; + } self.hits.fetch_add(1, Ordering::Relaxed); return Some(RouteHit { parent_guid: entry.parent_guid, @@ -157,6 +160,17 @@ impl RouteCache { rebuild_prefix_lengths(&mut entries); } + /// Drop every cached route. Used when a deeper lock-coupled + /// walker discovers a delete-fenced child that is not represented + /// by the top-level route candidate it entered through. + pub(crate) fn clear(&self) { + self.invalidations.fetch_add(1, Ordering::Relaxed); + let mut entries = self.entries.write().unwrap(); + entries.map.clear(); + entries.order.clear(); + entries.lengths.clear(); + } + /// Refresh the parent version after the caller revalidated that /// the cached parent edge still points at the same child. pub(crate) fn refresh_parent_version( @@ -195,6 +209,9 @@ impl RouteCache { child_guid: BlobGuid, child_depth: usize, ) { + if parent_depth != 0 { + return; + } let Some(prefix) = key.user_prefix(child_depth) else { return; }; @@ -591,7 +608,7 @@ mod tests { } #[test] - fn different_parent_depth_does_not_prune_longer_route() { + fn non_root_parent_routes_are_not_cached() { let cache = RouteCache::new(); cache.learn( SearchKey::user(b"bucket-00/path/deeper/file"), @@ -611,6 +628,7 @@ mod tests { ); let stats = cache.stats(); - assert_eq!(stats.entries, 2); + assert_eq!(stats.entries, 1); + assert_eq!(stats.learns, 1); } } diff --git a/src/engine/walker/erase.rs b/src/engine/walker/erase.rs index f51ae90..102d647 100644 --- a/src/engine/walker/erase.rs +++ b/src/engine/walker/erase.rs @@ -14,6 +14,7 @@ use super::readers::{ read_prefix, }; use super::route::{pin_route_parent, validate_route_edge}; +use super::types::{is_stale_blob_crossing, stale_blob_crossing}; use super::types::{EraseCondition, EraseOutcome, EraseReturn, EraseSignal, LookupResult}; use super::writers::{ finish_inner_with_sorted, inner_find_child, inner_update_child, set_prefix_child, @@ -72,6 +73,35 @@ pub fn erase_multi_conditional( key: SearchKey<'_>, seq: u64, condition: EraseCondition, +) -> Result { + let mut restarts = 0u32; + loop { + match erase_multi_conditional_once(bm, root_pin, route_cache, key, seq, condition) { + Err(e) if is_stale_blob_crossing(&e) => { + if let Some(cache) = route_cache { + cache.clear(); + } + bm.note_optimistic_restart(); + restarts = restarts.saturating_add(1); + if restarts >= 128 { + return Err(e); + } + if restarts % 16 == 0 { + std::thread::yield_now(); + } + } + out => return out, + } + } +} + +fn erase_multi_conditional_once( + bm: &BufferManager, + root_pin: &Arc, + route_cache: Option<&RouteCache>, + key: SearchKey<'_>, + seq: u64, + condition: EraseCondition, ) -> Result { // The caller (typically `Tree`) keeps `root_pin` alive across // every op so we skip `BufferManager`'s pin-Mutex on the hot @@ -343,7 +373,16 @@ fn lock_coupled_erase_in_blob( let r = match step { EraseStep::Done(r) => r, EraseStep::Crossing(crossing) => { - let child_pin = bm.pin(crossing.child_guid)?; + let child_pin = match bm.pin(crossing.child_guid) { + Ok(pin) => pin, + Err(e) + if is_blob_store_not_found(&e) && bm.has_delete_fence(crossing.child_guid) => + { + drop(guard); + return Err(stale_blob_crossing("stale blob crossing: erase deep child")); + } + Err(e) => return Err(e.with_blob_guid(crossing.child_guid)), + }; child_pin.prefetch_header(); let child_guard = child_pin.write(); diff --git a/src/engine/walker/insert.rs b/src/engine/walker/insert.rs index 6d278ec..ef08b6b 100644 --- a/src/engine/walker/insert.rs +++ b/src/engine/walker/insert.rs @@ -12,6 +12,7 @@ use super::migrate::blob_needs_compaction; use super::readers::{child_offset, ntype_of, read_leaf_key_ref, read_prefix}; use super::route::{pin_route_parent, validate_route_edge}; use super::spillover::{compact_blob, spillover_blob}; +use super::types::{is_stale_blob_crossing, stale_blob_crossing}; use super::types::{InsertCondition, InsertOutcome, InsertReturn, LookupResult}; use super::writers::{ inner_add_child, inner_find_child, inner_update_child, set_prefix_child, write_leaf, @@ -107,7 +108,37 @@ pub fn insert_multi_conditional( if value.len() > u16::MAX as usize { return Err(Error::ValueTooLong { len: value.len() }); } + let mut restarts = 0u32; + loop { + match insert_multi_conditional_once(bm, root_pin, route_cache, key, value, seq, condition) { + Err(e) if is_stale_blob_crossing(&e) => { + if let Some(cache) = route_cache { + cache.clear(); + } + bm.note_optimistic_restart(); + restarts = restarts.saturating_add(1); + if restarts >= 128 { + return Err(e); + } + if restarts % 16 == 0 { + std::thread::yield_now(); + } + } + out => return out, + } + } +} +#[allow(clippy::too_many_arguments)] +fn insert_multi_conditional_once( + bm: &BufferManager, + root_pin: &Arc, + route_cache: Option<&RouteCache>, + key: SearchKey<'_>, + value: &[u8], + seq: u64, + condition: InsertCondition, +) -> Result { let mut blob_hops = 0u64; let mut max_cross_blob_depth = 0usize; @@ -179,6 +210,12 @@ fn try_insert_from_root_router( }; let child_pin = match bm.pin(crossing.child_guid) { Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(crossing.child_guid) => { + drop(root_read); + return Err(stale_blob_crossing( + "stale blob crossing: insert root router", + )); + } Err(e) if is_blob_store_not_found(&e) => return Ok(None), Err(e) => return Err(e), }; @@ -302,6 +339,11 @@ fn try_insert_from_optimistic_route( } let child_pin = match bm.pin(route.child_guid) { Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(route.child_guid) => { + drop(parent_guard); + cache.invalidate(key, route); + return Err(stale_blob_crossing("stale blob crossing: insert route")); + } Err(e) if is_blob_store_not_found(&e) => { drop(parent_guard); cache.invalidate(key, route); @@ -402,7 +444,33 @@ pub(crate) fn insert_multi_batch_conditional( }); } } + let mut restarts = 0u32; + loop { + match insert_multi_batch_conditional_once(bm, root_pin, route_cache, items) { + Err(e) if is_stale_blob_crossing(&e) => { + if let Some(cache) = route_cache { + cache.clear(); + } + bm.note_optimistic_restart(); + restarts = restarts.saturating_add(1); + if restarts >= 128 { + return Err(e); + } + if restarts % 16 == 0 { + std::thread::yield_now(); + } + } + out => return out, + } + } +} +pub(crate) fn insert_multi_batch_conditional_once( + bm: &BufferManager, + root_pin: &Arc, + route_cache: Option<&RouteCache>, + items: &[InsertBatchItem<'_>], +) -> Result { let batched = try_insert_batch_from_first_blob(bm, root_pin, route_cache, items)?; if batched.applied != 0 { return Ok(batched); @@ -470,6 +538,14 @@ fn try_insert_batch_from_first_blob( let run_len = same_child_prefix_run_len(items, crossing.child_depth); let child_pin = match bm.pin(crossing.child_guid) { Ok(pin) => pin, + Err(e) + if is_blob_store_not_found(&e) && bm.has_delete_fence(crossing.child_guid) => + { + drop(root_read); + return Err(stale_blob_crossing( + "stale blob crossing: insert batch root router", + )); + } Err(e) if is_blob_store_not_found(&e) => { drop(root_read); return insert_batch_from_root(bm, root_pin, items); @@ -544,6 +620,13 @@ fn try_insert_batch_from_route( } let child_pin = match bm.pin(route.child_guid) { Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(route.child_guid) => { + drop(parent_guard); + cache.invalidate(first_key, route); + return Err(stale_blob_crossing( + "stale blob crossing: insert batch route", + )); + } Err(e) if is_blob_store_not_found(&e) => { drop(parent_guard); cache.invalidate(first_key, route); @@ -803,7 +886,19 @@ fn cross_and_insert( blob_hops: &mut u64, max_cross_blob_depth: &mut usize, ) -> Result { - let child_pin = bm.pin(crossing.child_guid)?; + let child_pin = match bm.pin(crossing.child_guid) { + Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(crossing.child_guid) => { + drop(parent_guard); + if parent_dirty { + bm.mark_dirty_cached(current_guid, seq, current_entry); + } + return Err(stale_blob_crossing( + "stale blob crossing: insert deep child", + )); + } + Err(e) => return Err(e.with_blob_guid(crossing.child_guid)), + }; child_pin.prefetch_header(); let child_guard = child_pin.write(); diff --git a/src/engine/walker/lookup.rs b/src/engine/walker/lookup.rs index 9b36bb5..bf772fb 100644 --- a/src/engine/walker/lookup.rs +++ b/src/engine/walker/lookup.rs @@ -151,6 +151,13 @@ where let (child_pin, child_depth) = match cold_lookup_or_pin(bm, key, crossing, &mut consume)? { ColdLookupOrPin::Done(result) => return Ok(result), ColdLookupOrPin::Pin { pin, depth } => (pin, depth), + ColdLookupOrPin::Restart => { + if let Some(cache) = route_cache { + cache.clear(); + } + bm.note_optimistic_restart(); + continue 'restart; + } }; // Cross-blob hops. Same pattern; on a torn read we restart @@ -217,6 +224,11 @@ where ) { Ok(ColdLookupOrPin::Done(result)) => return Ok(RouteLookup::Done(result)), Ok(ColdLookupOrPin::Pin { pin, .. }) => pin, + Ok(ColdLookupOrPin::Restart) => { + drop(parent_guard); + cache.clear(); + return Ok(RouteLookup::Restart); + } Err(e) if is_blob_store_not_found(&e) => { drop(parent_guard); cache.invalidate(key, route); @@ -257,6 +269,10 @@ where let (next_pin, next_depth) = match cold_lookup_or_pin(bm, key, crossing, consume)? { ColdLookupOrPin::Done(result) => return Ok(RouteLookup::Done(result)), ColdLookupOrPin::Pin { pin, depth } => (pin, depth), + ColdLookupOrPin::Restart => { + cache.clear(); + return Ok(RouteLookup::Restart); + } }; match lookup_from_pinned_blob(bm, Some(cache), key, next_pin, next_depth, consume)? { CrossBlobLookup::Done(result) => Ok(RouteLookup::Done(result)), @@ -303,6 +319,12 @@ where }; match cold_lookup_or_pin(bm, key, crossing, consume)? { ColdLookupOrPin::Done(result) => return Ok(CrossBlobLookup::Done(result)), + ColdLookupOrPin::Restart => { + if let Some(cache) = route_cache { + cache.clear(); + } + return Ok(CrossBlobLookup::Restart); + } ColdLookupOrPin::Pin { pin: child_pin, depth: child_depth, @@ -356,6 +378,7 @@ fn validate_child_crossing( enum ColdLookupOrPin { Done(Option), Pin { pin: Arc, depth: usize }, + Restart, } fn cold_lookup_or_pin( @@ -370,7 +393,13 @@ where // Only exact point lookups (a user-style key) take the cold path; // range/prefix/non-exact searches pin directly. if key.user_bytes().is_none() { - let pin = bm.pin(crossing.child_guid)?; + let pin = match bm.pin(crossing.child_guid) { + Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(crossing.child_guid) => { + return Ok(ColdLookupOrPin::Restart); + } + Err(e) => return Err(e), + }; pin.prefetch_header(); return Ok(ColdLookupOrPin::Pin { pin, @@ -385,7 +414,13 @@ where // uncertainty falls back to the authoritative full pin. match cold_read_routed(bm, child_guid, key, child_depth) { ColdBlobLookup::Unknown => { - let pin = bm.pin(child_guid)?; + let pin = match bm.pin(child_guid) { + Ok(pin) => pin, + Err(e) if is_blob_store_not_found(&e) && bm.has_delete_fence(child_guid) => { + return Ok(ColdLookupOrPin::Restart); + } + Err(e) => return Err(e), + }; pin.prefetch_header(); return Ok(ColdLookupOrPin::Pin { pin, diff --git a/src/engine/walker/migrate.rs b/src/engine/walker/migrate.rs index a687762..ce6d427 100644 --- a/src/engine/walker/migrate.rs +++ b/src/engine/walker/migrate.rs @@ -29,6 +29,7 @@ use crate::store::{ }; use super::cast; +use super::cow::child_is_snapshot_shared; use super::readers::child_offset; use super::types::MakeBlobOutcome; use super::writers::{write_prefix_chain, write_struct_at}; @@ -372,6 +373,9 @@ pub fn is_mergeable( ) -> Result { let bn = read_blob_node(parent_frame, parent_bn_off)?; let child_pin = bm.pin(bn.child_blob_guid)?; + if child_is_snapshot_shared(bm, child_pin.as_ref()) { + return Ok(false); + } let guard = child_pin.read(); let child_frame = BlobFrameRef::wrap(guard.as_slice()); diff --git a/src/engine/walker/types.rs b/src/engine/walker/types.rs index 765126a..5e5c4d7 100644 --- a/src/engine/walker/types.rs +++ b/src/engine/walker/types.rs @@ -1,5 +1,6 @@ //! Walker types — public outcomes + internal signals. +use crate::api::errors::Error; use crate::layout::{BlobGuid, NodeType}; use crate::store::blob_store::AlignedBlobBuf; @@ -128,6 +129,16 @@ pub(super) struct EraseReturn { pub(super) mutated: bool, } +pub(super) const STALE_BLOB_CROSSING: &str = "stale blob crossing"; + +pub(super) const fn stale_blob_crossing(where_: &'static str) -> Error { + Error::Internal(where_) +} + +pub(super) fn is_stale_blob_crossing(error: &Error) -> bool { + matches!(error, Error::Internal(msg) if msg.starts_with(STALE_BLOB_CROSSING)) +} + /// What kind of edge the parent of a victim subtree has. #[derive(Debug, Clone, Copy)] pub(super) enum VictimEdgeKind { diff --git a/src/store/buffer_manager/mod.rs b/src/store/buffer_manager/mod.rs index 6fda8db..40792db 100644 --- a/src/store/buffer_manager/mod.rs +++ b/src/store/buffer_manager/mod.rs @@ -838,6 +838,12 @@ impl BufferManager { .has_delete_fence(&guid) } + /// True while `guid` is logically unlinked from the live tree but + /// still fenced by the deferred-delete protocol. + pub(crate) fn has_delete_fence(&self, guid: BlobGuid) -> bool { + self.is_pending_delete(guid) + } + fn pending_delete_not_found(guid: BlobGuid) -> Error { Error::BlobStoreIo(std::io::Error::new( std::io::ErrorKind::NotFound, diff --git a/tools/soak/Cargo.lock b/tools/soak/Cargo.lock index 154a92d..7666793 100644 --- a/tools/soak/Cargo.lock +++ b/tools/soak/Cargo.lock @@ -60,7 +60,7 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" [[package]] name = "holt" -version = "0.7.1" +version = "0.7.2" dependencies = [ "crc32fast", "crossbeam-channel",