From 6dbc5a1993f4e264ccd8faad6c882e757424891d Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 6 May 2026 13:15:01 +0200 Subject: [PATCH 1/5] Vortex-datafusion segment cache Signed-off-by: Christoph Schulze --- vortex-datafusion/public-api.lock | 2 ++ vortex-datafusion/src/persistent/opener.rs | 13 +++++++++++++ vortex-datafusion/src/persistent/source.rs | 14 ++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 9d558c1de48..be04e369343 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -200,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled: pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self +pub fn vortex_datafusion::VortexSource::with_segment_cache(self, segment_cache: alloc::sync::Arc) -> Self + pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc) -> Self impl core::clone::Clone for vortex_datafusion::VortexSource diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 0719b023881..1b6f18cf907 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -41,6 +41,7 @@ use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; use vortex::layout::scan::scan_builder::ScanBuilder; +use vortex::layout::segments::SegmentCache; use vortex::layout::scan::split_by::SplitBy; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; @@ -97,6 +98,7 @@ pub(crate) struct VortexOpener { pub expression_convertor: Arc, pub file_metadata_cache: Option>, + pub segment_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. pub projection_pushdown: bool, pub scan_concurrency: Option, @@ -122,6 +124,7 @@ impl FileOpener for VortexOpener { let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); let file_metadata_cache = self.file_metadata_cache.clone(); + let segment_cache = self.segment_cache.clone(); let unified_file_schema = Arc::clone(self.table_schema.file_schema()); let batch_size = self.batch_size; @@ -200,6 +203,10 @@ impl FileOpener for VortexOpener { open_opts = open_opts.with_footer(vortex_metadata.footer().clone()); } + if let Some(segment_cache) = segment_cache { + open_opts = open_opts.with_segment_cache(segment_cache); + } + let vxf = open_opts .open_read(reader) .await @@ -656,6 +663,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, } @@ -788,6 +796,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -875,6 +884,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1032,6 +1042,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1092,6 +1103,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, } @@ -1296,6 +1308,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache: None, projection_pushdown: false, scan_concurrency: None, }; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 211170c6607..c3fe481574e 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -15,6 +15,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::PhysicalExprRef; +use vortex::layout::segments::SegmentCache; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; @@ -192,6 +193,7 @@ pub struct VortexSource { pub(crate) vortex_reader_factory: Option>, vx_metrics_registry: Arc, file_metadata_cache: Option>, + segment_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. options: VortexTableOptions, } @@ -224,6 +226,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + segment_cache: None, options: VortexTableOptions::default(), } } @@ -283,6 +286,16 @@ impl VortexSource { self } + /// Sets a [`SegmentCache`] to reuse decoded segment bytes across scans of the same file. + /// + /// Without a cache every query re-reads zone map and data segments from object storage. + /// Providing a shared [`MokaSegmentCache`](vortex::layout::segments::MokaSegmentCache) + /// eliminates those redundant reads for repeated queries on the same files. + pub fn with_segment_cache(mut self, segment_cache: Arc) -> Self { + self.segment_cache = Some(segment_cache); + self + } + /// Sets the per-file Vortex scan concurrency. /// /// This is separate from DataFusion's partition-level parallelism. @@ -341,6 +354,7 @@ impl FileSource for VortexSource { has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: self.file_metadata_cache.clone(), + segment_cache: self.segment_cache.clone(), projection_pushdown: self.options.projection_pushdown, scan_concurrency: self.options.scan_concurrency, }; From 60445c229a4fed96409862eed118d3bba947d40f Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 6 May 2026 13:47:37 +0200 Subject: [PATCH 2/5] update API lock Signed-off-by: Christoph Schulze --- vortex-datafusion/public-api.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 7cb0b81458d..a2e5e7bb837 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -200,7 +200,7 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, bool) -> pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, usize) -> Self -pub fn vortex_datafusion::VortexSource::with_segment_cache(self, segment_cache: alloc::sync::Arc) -> Self +pub fn vortex_datafusion::VortexSource::with_segment_cache(self, alloc::sync::Arc) -> Self pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, alloc::sync::Arc) -> Self From d9f9d68251b8377db09a78f43f9f66e179bbb110 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 6 May 2026 13:48:00 +0200 Subject: [PATCH 3/5] fmt Signed-off-by: Christoph Schulze --- vortex-datafusion/src/persistent/opener.rs | 2 +- vortex-datafusion/src/persistent/source.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a635ebc399e..ba33ae9ef22 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -41,8 +41,8 @@ use vortex::file::OpenOptionsSessionExt; use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; use vortex::layout::scan::scan_builder::ScanBuilder; -use vortex::layout::segments::SegmentCache; use vortex::layout::scan::split_by::SplitBy; +use vortex::layout::segments::SegmentCache; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 9af9997d93e..54a085e5ec3 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -15,7 +15,6 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::PhysicalExprRef; -use vortex::layout::segments::SegmentCache; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; @@ -31,6 +30,7 @@ use object_store::path::Path; use vortex::error::VortexExpect; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; +use vortex::layout::segments::SegmentCache; use vortex::metrics::DefaultMetricsRegistry; use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; From ee4dfb78d364812111674d206bb0f7b13391bc5d Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Fri, 8 May 2026 12:16:08 +0200 Subject: [PATCH 4/5] namespace segment cache Signed-off-by: Christoph Schulze --- vortex-datafusion/public-api.lock | 2 +- vortex-datafusion/src/persistent/opener.rs | 39 +++-- vortex-datafusion/src/persistent/source.rs | 29 ++-- vortex-file/public-api.lock | 4 +- vortex-file/src/open.rs | 17 +- vortex-file/src/segments/cache.rs | 5 +- vortex-layout/public-api.lock | 104 +++++++++++- vortex-layout/src/segments/cache.rs | 183 ++++++++++++++++++++- 8 files changed, 350 insertions(+), 33 deletions(-) diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index a2e5e7bb837..f0985084367 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -200,7 +200,7 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, bool) -> pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, usize) -> Self -pub fn vortex_datafusion::VortexSource::with_segment_cache(self, alloc::sync::Arc) -> Self +pub fn vortex_datafusion::VortexSource::with_segment_cache_builder(self, alloc::sync::Arc) -> Self pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, alloc::sync::Arc) -> Self diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index ba33ae9ef22..f13142b43b4 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -30,6 +30,7 @@ use futures::StreamExt; use futures::TryStreamExt; use futures::stream; use itertools::Itertools; +use object_store::ObjectMeta; use object_store::path::Path; use tracing::Instrument; use vortex::array::VortexSessionExecute; @@ -42,7 +43,6 @@ use vortex::io::InstrumentedReadAt; use vortex::layout::LayoutReader; use vortex::layout::scan::scan_builder::ScanBuilder; use vortex::layout::scan::split_by::SplitBy; -use vortex::layout::segments::SegmentCache; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; @@ -56,6 +56,10 @@ use crate::convert::exprs::make_vortex_predicate; use crate::convert::schema::calculate_physical_schema; use crate::metrics::PARTITION_LABEL; use crate::metrics::PATH_LABEL; +use vortex::layout::segments::FileIdentity; +use vortex::layout::segments::FileVersion; +use vortex::layout::segments::SegmentCacheBuilder; + use crate::persistent::cache::CachedVortexMetadata; use crate::persistent::reader::VortexReaderFactory; use crate::persistent::stream::PrunableStream; @@ -98,7 +102,7 @@ pub(crate) struct VortexOpener { pub expression_convertor: Arc, pub file_metadata_cache: Option>, - pub segment_cache: Option>, + pub segment_cache_builder: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. pub projection_pushdown: bool, pub scan_concurrency: Option, @@ -124,7 +128,7 @@ impl FileOpener for VortexOpener { let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); let file_metadata_cache = self.file_metadata_cache.clone(); - let segment_cache = self.segment_cache.clone(); + let segment_cache_builder = self.segment_cache_builder.clone(); let unified_file_schema = Arc::clone(self.table_schema.file_schema()); let batch_size = self.batch_size; @@ -203,8 +207,9 @@ impl FileOpener for VortexOpener { open_opts = open_opts.with_footer(vortex_metadata.footer().clone()); } - if let Some(segment_cache) = segment_cache { - open_opts = open_opts.with_segment_cache(segment_cache); + if let Some(builder) = segment_cache_builder { + let identity = file_identity(&file.object_meta); + open_opts = open_opts.with_segment_cache(builder.cache_for(&identity)); } let vxf = open_opts @@ -452,6 +457,18 @@ impl FileOpener for VortexOpener { } } +/// Build a [`FileIdentity`] from object store metadata, preferring the etag and falling +/// back to `(size, last_modified)` when no etag is available. +fn file_identity(meta: &ObjectMeta) -> FileIdentity { + let path = Arc::from(meta.location.as_ref()); + let version = if let Some(etag) = meta.e_tag.as_deref() { + FileVersion::Etag(Arc::from(etag)) + } else { + FileVersion::SizeMtime(meta.size, meta.last_modified.timestamp()) + }; + FileIdentity { path, version } +} + fn natural_split_ranges_for_file( natural_split_ranges: &DashMap]>>, path: &Path, @@ -687,7 +704,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, } @@ -820,7 +837,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -908,7 +925,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1066,7 +1083,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1127,7 +1144,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, } @@ -1332,7 +1349,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 54a085e5ec3..1866ada114a 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -30,7 +30,7 @@ use object_store::path::Path; use vortex::error::VortexExpect; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; -use vortex::layout::segments::SegmentCache; +use vortex::layout::segments::SegmentCacheBuilder; use vortex::metrics::DefaultMetricsRegistry; use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; @@ -193,7 +193,7 @@ pub struct VortexSource { pub(crate) vortex_reader_factory: Option>, vx_metrics_registry: Arc, file_metadata_cache: Option>, - segment_cache: Option>, + segment_cache_builder: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. options: VortexTableOptions, } @@ -226,7 +226,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, - segment_cache: None, + segment_cache_builder: None, options: VortexTableOptions::default(), } } @@ -286,13 +286,22 @@ impl VortexSource { self } - /// Sets a [`SegmentCache`] to reuse decoded segment bytes across scans of the same file. + /// Sets a [`SegmentCacheBuilder`] to reuse segment bytes across scans of the same files. /// - /// Without a cache every query re-reads zone map and data segments from object storage. - /// Providing a shared [`MokaSegmentCache`](vortex::layout::segments::MokaSegmentCache) - /// eliminates those redundant reads for repeated queries on the same files. - pub fn with_segment_cache(mut self, segment_cache: Arc) -> Self { - self.segment_cache = Some(segment_cache); + /// Without a builder every query re-reads zone map and data segments from object storage. + /// The builder is invoked once per opened file with that file's + /// [`FileIdentity`](vortex::layout::segments::FileIdentity); the returned per-file + /// [`SegmentCache`](vortex::layout::segments::SegmentCache) is wired into the file open + /// path. Use + /// [`NamespacedMokaSegmentCacheBuilder`](vortex::layout::segments::NamespacedMokaSegmentCacheBuilder) + /// for cross-query reuse with a global memory budget, optionally wrapped in + /// [`InstrumentedSegmentCacheBuilder`](vortex::layout::segments::InstrumentedSegmentCacheBuilder) + /// for hit/miss metrics. + pub fn with_segment_cache_builder( + mut self, + builder: Arc, + ) -> Self { + self.segment_cache_builder = Some(builder); self } @@ -352,7 +361,7 @@ impl VortexSource { has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), - segment_cache: self.segment_cache.clone(), + segment_cache_builder: self.segment_cache_builder.clone(), projection_pushdown: self.options.projection_pushdown, scan_concurrency: self.options.scan_concurrency, }; diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index a20ab092751..e80a60246a8 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -40,7 +40,7 @@ pub fn vortex_file::segments::FileSegmentSource::request(&self, vortex_layout::s pub struct vortex_file::segments::InitialReadSegmentCache -pub vortex_file::segments::InitialReadSegmentCache::fallback: alloc::sync::Arc +pub vortex_file::segments::InitialReadSegmentCache::fallback: vortex_layout::segments::cache::SharedSegmentCache pub vortex_file::segments::InitialReadSegmentCache::initial: parking_lot::rwlock::RwLock> @@ -312,7 +312,7 @@ pub fn vortex_file::VortexOpenOptions::with_labels(self, alloc::vec::Vec) -> Self -pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, alloc::sync::Arc) -> Self +pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, vortex_layout::segments::cache::SharedSegmentCache) -> Self pub fn vortex_file::VortexOpenOptions::with_some_file_size(self, core::option::Option) -> Self diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 3a5d3cf90e5..8db573492d2 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -17,9 +17,9 @@ use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::InstrumentedSegmentCache; use vortex_layout::segments::NoOpSegmentCache; -use vortex_layout::segments::SegmentCache; use vortex_layout::segments::SegmentCacheSourceAdapter; use vortex_layout::segments::SegmentId; +use vortex_layout::segments::SharedSegmentCache; use vortex_layout::segments::SharedSegmentSource; use vortex_layout::session::LayoutSessionExt; use vortex_metrics::DefaultMetricsRegistry; @@ -45,7 +45,7 @@ pub struct VortexOpenOptions { /// The session to use for opening the file. session: VortexSession, /// Cache to use for file segments. - segment_cache: Option>, + segment_cache: Option, /// The number of bytes to read when parsing the footer. initial_read_size: usize, /// An optional, externally provided, file size. @@ -93,7 +93,18 @@ impl VortexOpenOptions { } /// Configure a custom [`SegmentCache`]. - pub fn with_segment_cache(mut self, segment_cache: Arc) -> Self { + /// + /// The supplied cache must be **scoped to a single file**: [`SegmentId`] is a + /// file-local index, so reusing one cache across multiple files will alias entries + /// from different files onto the same key. For cross-file sharing use a + /// [`SegmentCacheBuilder`] (e.g. + /// [`NamespacedMokaSegmentCacheBuilder`](vortex_layout::segments::NamespacedMokaSegmentCacheBuilder)) + /// at the layer that opens files, and pass the per-file [`SegmentCache`] it returns + /// here. + /// + /// [`SegmentId`]: vortex_layout::segments::SegmentId + /// [`SegmentCacheBuilder`]: vortex_layout::segments::SegmentCacheBuilder + pub fn with_segment_cache(mut self, segment_cache: SharedSegmentCache) -> Self { self.segment_cache = Some(segment_cache); self } diff --git a/vortex-file/src/segments/cache.rs b/vortex-file/src/segments/cache.rs index a17e3866bf4..9d465864eb1 100644 --- a/vortex-file/src/segments/cache.rs +++ b/vortex-file/src/segments/cache.rs @@ -1,20 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::sync::Arc; - use async_trait::async_trait; use parking_lot::RwLock; use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_layout::segments::SegmentCache; use vortex_layout::segments::SegmentId; +use vortex_layout::segments::SharedSegmentCache; use vortex_utils::aliases::hash_map::HashMap; /// Segment cache containing the initial read segments. pub struct InitialReadSegmentCache { pub initial: RwLock>, - pub fallback: Arc, + pub fallback: SharedSegmentCache, } #[async_trait] diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index c0f3acec787..4a9c4a2230a 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -1138,6 +1138,58 @@ impl core::marker::Copy for vortex_layout::scan::split_by::SplitBy pub mod vortex_layout::segments +pub enum vortex_layout::segments::FileVersion + +pub vortex_layout::segments::FileVersion::Etag(alloc::sync::Arc) + +pub vortex_layout::segments::FileVersion::SizeMtime(u64, i64) + +impl core::clone::Clone for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::clone(&self) -> vortex_layout::segments::FileVersion + +impl core::cmp::Eq for vortex_layout::segments::FileVersion + +impl core::cmp::PartialEq for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::eq(&self, &vortex_layout::segments::FileVersion) -> bool + +impl core::fmt::Debug for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_layout::segments::FileVersion + +pub struct vortex_layout::segments::FileIdentity + +pub vortex_layout::segments::FileIdentity::path: alloc::sync::Arc + +pub vortex_layout::segments::FileIdentity::version: vortex_layout::segments::FileVersion + +impl core::clone::Clone for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::clone(&self) -> vortex_layout::segments::FileIdentity + +impl core::cmp::Eq for vortex_layout::segments::FileIdentity + +impl core::cmp::PartialEq for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::eq(&self, &vortex_layout::segments::FileIdentity) -> bool + +impl core::fmt::Debug for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_layout::segments::FileIdentity + pub struct vortex_layout::segments::InstrumentedSegmentCache impl vortex_layout::segments::InstrumentedSegmentCache @@ -1150,6 +1202,16 @@ pub fn vortex_layout::segments::InstrumentedSegmentCache::get<'life0, 'async_ pub fn vortex_layout::segments::InstrumentedSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::InstrumentedSegmentCacheBuilder + +impl vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::new(B, alloc::sync::Arc, alloc::vec::Vec) -> Self + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::MokaSegmentCache(_) impl vortex_layout::segments::MokaSegmentCache @@ -1162,6 +1224,16 @@ pub fn vortex_layout::segments::MokaSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::MokaSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +impl vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::new(u64) -> Self + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::NoOpSegmentCache impl vortex_layout::segments::SegmentCache for vortex_layout::segments::NoOpSegmentCache @@ -1170,11 +1242,17 @@ pub fn vortex_layout::segments::NoOpSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::NoOpSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::NoOpSegmentCacheBuilder + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NoOpSegmentCacheBuilder + +pub fn vortex_layout::segments::NoOpSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::SegmentCacheSourceAdapter impl vortex_layout::segments::SegmentCacheSourceAdapter -pub fn vortex_layout::segments::SegmentCacheSourceAdapter::new(alloc::sync::Arc, alloc::sync::Arc) -> Self +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::new(vortex_layout::segments::SharedSegmentCache, alloc::sync::Arc) -> Self impl vortex_layout::segments::SegmentSource for vortex_layout::segments::SegmentCacheSourceAdapter @@ -1264,12 +1342,34 @@ pub fn vortex_layout::segments::NoOpSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::NoOpSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +impl vortex_layout::segments::SegmentCache for alloc::sync::Arc + +pub fn alloc::sync::Arc::get<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait + +pub fn alloc::sync::Arc::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait + impl vortex_layout::segments::SegmentCache for vortex_layout::segments::InstrumentedSegmentCache pub fn vortex_layout::segments::InstrumentedSegmentCache::get<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait pub fn vortex_layout::segments::InstrumentedSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub trait vortex_layout::segments::SegmentCacheBuilder: core::marker::Send + core::marker::Sync + +pub fn vortex_layout::segments::SegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NoOpSegmentCacheBuilder + +pub fn vortex_layout::segments::NoOpSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub trait vortex_layout::segments::SegmentSink: core::marker::Send + core::marker::Sync pub fn vortex_layout::segments::SegmentSink::write<'life0, 'async_trait>(&'life0 self, vortex_layout::sequence::SequenceId, alloc::vec::Vec) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait @@ -1290,6 +1390,8 @@ pub type vortex_layout::segments::SegmentFuture = futures_core::future::BoxFutur pub type vortex_layout::segments::SegmentSinkRef = alloc::sync::Arc +pub type vortex_layout::segments::SharedSegmentCache = alloc::sync::Arc + pub mod vortex_layout::sequence pub struct vortex_layout::sequence::SequenceId diff --git a/vortex-layout/src/segments/cache.rs b/vortex-layout/src/segments/cache.rs index 37675c19d2a..ee85d210993 100644 --- a/vortex-layout/src/segments/cache.rs +++ b/vortex-layout/src/segments/cache.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::sync::Arc; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; use async_trait::async_trait; use futures::FutureExt; @@ -17,6 +19,7 @@ use vortex_metrics::Counter; use vortex_metrics::Label; use vortex_metrics::MetricBuilder; use vortex_metrics::MetricsRegistry; +use vortex_utils::aliases::dash_map::DashMap; use crate::segments::SegmentFuture; use crate::segments::SegmentId; @@ -29,6 +32,25 @@ pub trait SegmentCache: Send + Sync { async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>; } +/// Shared, type-erased reference to a [`SegmentCache`]. +/// +/// This is the form used at almost every API boundary that hands off a [`SegmentCache`] +/// (builder outputs, file open options, source adapters). The alias exists primarily so +/// that IDE "find references" can locate every shared cache hand-off without matching +/// every `Arc` in the codebase. +pub type SharedSegmentCache = Arc; + +#[async_trait] +impl SegmentCache for Arc { + async fn get(&self, id: SegmentId) -> VortexResult> { + (**self).get(id).await + } + + async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> { + (**self).put(id, buffer).await + } +} + pub struct NoOpSegmentCache; #[async_trait] @@ -124,13 +146,53 @@ impl SegmentCache for InstrumentedSegmentCache { } } +/// Decorator [`SegmentCacheBuilder`] that wraps each per-file cache with an +/// [`InstrumentedSegmentCache`] for hit/miss/store metrics. +/// +/// # Example +/// +/// ```ignore +/// let cache = Arc::new(InstrumentedSegmentCacheBuilder::new( +/// NamespacedMokaSegmentCacheBuilder::new(2 << 30), +/// metrics_registry, +/// vec![], +/// )); +/// ``` +pub struct InstrumentedSegmentCacheBuilder { + inner: B, + metrics_registry: Arc, + labels: Vec