From ea1342c972fa2ea9006a8d49dd4ed9875bd2a3b8 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 6 Jun 2026 14:36:45 +0800 Subject: [PATCH] feat: support cleanup explain API --- rust/lance/src/dataset.rs | 16 +- rust/lance/src/dataset/cleanup.rs | 773 ++++++++++++++++++++++++------ 2 files changed, 645 insertions(+), 144 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 2e448dfa828..340e91f3b52 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -24,8 +24,7 @@ use lance_core::datatypes::{OnMissing, OnTypeMismatch, Projectable, Projection}; use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ - DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT, - TRACE_DATASET_EVENTS, + DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT, TRACE_DATASET_EVENTS, }; use lance_datafusion::projection::ProjectionPlan; use lance_file::datatypes::populate_schema_dictionary; @@ -104,7 +103,7 @@ use self::scanner::{DatasetRecordBatchStream, Scanner}; use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry}; use self::write::{cleanup_data_fragments, write_fragments_internal}; use crate::dataset::branch_location::BranchLocation; -use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder}; +use crate::dataset::cleanup::{CleanupOperation, CleanupPolicy, CleanupPolicyBuilder}; use crate::dataset::refs::{BranchContents, BranchIdentifier, Branches, Tags}; use crate::dataset::sql::SqlQueryBuilder; use crate::datatypes::Schema; @@ -1201,8 +1200,15 @@ impl Dataset { &self, policy: CleanupPolicy, ) -> BoxFuture<'_, Result> { - info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri); - cleanup::cleanup_old_versions(self, policy).boxed() + async move { self.cleanup(policy).execute().await }.boxed() + } + + /// Creates a cleanup operation for this dataset. + /// + /// The returned operation can be explained without deleting files, or + /// executed to re-evaluate the current dataset state and remove files. + pub fn cleanup(&self, policy: CleanupPolicy) -> CleanupOperation<'_> { + CleanupOperation::new(self, policy) } #[allow(clippy::too_many_arguments)] diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index b3ca60cfa0f..65928038cea 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -46,7 +46,8 @@ use lance_core::{ Error, Result, utils::tracing::{ AUDIT_MODE_DELETE, AUDIT_MODE_DELETE_UNVERIFIED, AUDIT_TYPE_DATA, AUDIT_TYPE_DELETION, - AUDIT_TYPE_INDEX, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT, + AUDIT_TYPE_INDEX, AUDIT_TYPE_MANIFEST, DATASET_CLEANING_EVENT, TRACE_DATASET_EVENTS, + TRACE_FILE_AUDIT, }, }; use lance_table::{ @@ -78,7 +79,7 @@ struct ReferencedFiles { index_uuids: HashSet, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct RemovalStats { pub bytes_removed: u64, pub old_versions: u64, @@ -88,12 +89,194 @@ pub struct RemovalStats { pub deletion_files_removed: u64, } -#[derive(Clone, Copy, Debug)] -enum RemovedFileType { +/// A read-only explanation of what a cleanup operation would remove. +/// +/// This is an explanation, not a deletion plan. Calling +/// [`CleanupOperation::execute`] re-evaluates the current dataset and reference +/// state before deleting files. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CleanupExplanation { + /// Dataset version observed when the explanation was produced. + pub read_version: u64, + /// Aggregate statistics for files that would be removed. + pub stats: RemovalStats, + /// Candidate files that would be removed, capped by `candidate_file_limit`. + pub candidate_files: Vec, + /// True if more candidate files were found than are included. + pub candidate_files_truncated: bool, + /// Maximum number of candidate files included in this explanation. + pub candidate_file_limit: usize, + /// Referenced child branches and whether cleanup would cascade into them. + pub referenced_branches: Vec, + /// Non-fatal warnings about the explanation. + pub warnings: Vec, +} + +/// A file that cleanup identified as removable. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CleanupCandidateFile { + /// Dataset-relative or storage path for the candidate file. + pub path: String, + /// Kind of file identified by cleanup. + pub kind: CleanupFileKind, + /// True if the file is removable only because it aged past the unverified + /// retention threshold or `delete_unverified` is enabled. + pub unverified: bool, + /// Candidate file size in bytes. + pub size_bytes: u64, +} + +/// A branch that references the current branch lineage. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CleanupReferencedBranch { + /// Branch name. + pub name: String, + /// Version of the current lineage referenced by this branch. + pub referenced_version: u64, + /// True if this branch would be cleaned when cascading cleanup is enabled. + pub cleanup_candidate: bool, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum CleanupFileKind { + Manifest, Data, Transaction, Index, Deletion, + /// A leftover `_versions/.tmp` manifest from a failed transaction. These + /// are deleted but excluded from per-kind `RemovalStats` counts and audit + /// logs to match the long-standing cleanup behavior. Their bytes + /// are still included in `bytes_removed`. + TemporaryManifest, +} + +impl CleanupCandidateFile { + fn from_cleanup_file(file: &CleanupFile) -> Self { + Self { + path: file.path.to_string(), + kind: file.kind, + unverified: file.unverified, + size_bytes: file.size_bytes, + } + } +} + +fn cleanup_file( + path: Path, + kind: CleanupFileKind, + unverified: bool, + size_bytes: u64, +) -> Option { + Some(CleanupFile { + path, + kind, + unverified, + size_bytes, + }) +} + +#[derive(Clone, Debug)] +struct CleanupFile { + path: Path, + kind: CleanupFileKind, + /// True when the file was kept on disk past its referenced lifetime + /// because we could not verify it was safe to remove (e.g. produced by an + /// unfinished commit) and is being deleted only because it has aged past + /// the unverified-retention threshold or `delete_unverified` is set. + unverified: bool, + size_bytes: u64, +} + +impl RemovalStats { + fn record_file(&mut self, file: &CleanupFile) { + self.bytes_removed += file.size_bytes; + match file.kind { + CleanupFileKind::Manifest => self.old_versions += 1, + CleanupFileKind::Data => self.data_files_removed += 1, + CleanupFileKind::Transaction => self.transaction_files_removed += 1, + CleanupFileKind::Index => self.index_files_removed += 1, + CleanupFileKind::Deletion => self.deletion_files_removed += 1, + CleanupFileKind::TemporaryManifest => {} + } + } + + fn merge(&mut self, other: &Self) { + self.bytes_removed += other.bytes_removed; + self.old_versions += other.old_versions; + self.data_files_removed += other.data_files_removed; + self.transaction_files_removed += other.transaction_files_removed; + self.index_files_removed += other.index_files_removed; + self.deletion_files_removed += other.deletion_files_removed; + } +} + +#[derive(Debug, Default)] +struct CleanupRunResult { + stats: RemovalStats, + removed_manifests: HashSet, + candidate_files: Vec, + candidate_files_truncated: bool, + referenced_branches: Vec, +} + +impl CleanupRunResult { + fn record_file( + &mut self, + file: &CleanupFile, + candidate_file_limit: Option, + track_removed_manifests: bool, + ) { + self.stats.record_file(file); + if track_removed_manifests && matches!(file.kind, CleanupFileKind::Manifest) { + self.removed_manifests.insert(file.path.clone()); + } + if let Some(limit) = candidate_file_limit { + if self.candidate_files.len() < limit { + self.candidate_files + .push(CleanupCandidateFile::from_cleanup_file(file)); + } else { + self.candidate_files_truncated = true; + } + } + } + + fn merge(&mut self, other: Self, candidate_file_limit: Option) { + self.stats.merge(&other.stats); + self.removed_manifests.extend(other.removed_manifests); + self.referenced_branches.extend(other.referenced_branches); + if let Some(limit) = candidate_file_limit { + for file in other.candidate_files { + if self.candidate_files.len() < limit { + self.candidate_files.push(file); + } else { + self.candidate_files_truncated = true; + } + } + self.candidate_files_truncated |= other.candidate_files_truncated; + } + } +} + +#[derive(Clone, Copy, Debug)] +enum CleanupAction { + Execute, + Explain { max_candidate_files: usize }, +} + +impl CleanupAction { + fn deletes_files(self) -> bool { + matches!(self, Self::Execute) + } + + fn candidate_file_limit(self) -> Option { + match self { + Self::Execute => None, + Self::Explain { + max_candidate_files, + } => Some(max_candidate_files), + } + } } fn remove_prefix(path: &Path, prefix: &Path) -> Path { @@ -108,6 +291,11 @@ fn remove_prefix(path: &Path, prefix: &Path) -> Path { struct CleanupTask<'a> { dataset: &'a Dataset, policy: CleanupPolicy, + action: CleanupAction, + read_version: u64, + ignored_manifests: HashSet, + track_removed_manifests: bool, + include_referenced_branches: bool, } /// Information about the dataset that we learn by inspecting all of the manifests @@ -131,21 +319,131 @@ struct CleanupInspection { const UNVERIFIED_THRESHOLD_DAYS: i64 = 7; const S3_DELETE_STREAM_BATCH_SIZE: u64 = 1_000; const AZURE_DELETE_STREAM_BATCH_SIZE: u64 = 256; +const DEFAULT_EXPLANATION_MAX_CANDIDATE_FILES: usize = 1_000; + +/// Builder-style cleanup operation. +/// +/// Call [`Self::explain`] for a read-only explanation of what cleanup would +/// remove, or [`Self::execute`] to re-evaluate the current dataset state and +/// delete files. +pub struct CleanupOperation<'a> { + dataset: &'a Dataset, + policy: CleanupPolicy, + max_candidate_files: usize, +} + +impl<'a> CleanupOperation<'a> { + pub(crate) fn new(dataset: &'a Dataset, policy: CleanupPolicy) -> Self { + Self { + dataset, + policy, + max_candidate_files: DEFAULT_EXPLANATION_MAX_CANDIDATE_FILES, + } + } + + /// Set the maximum number of candidate files included in explanations. + /// + /// The aggregate [`RemovalStats`] in [`CleanupExplanation`] still include + /// all files that would be removed. + pub fn with_max_candidate_files(mut self, max_candidate_files: usize) -> Self { + self.max_candidate_files = max_candidate_files; + self + } + + /// Explain what cleanup would remove without deleting files. + pub async fn explain(&self) -> Result { + let cleanup = CleanupTask::new( + self.dataset, + self.policy.clone(), + CleanupAction::Explain { + max_candidate_files: self.max_candidate_files, + }, + ); + let read_version = cleanup.read_version; + let result = cleanup.run().await?; + let warnings = if result.candidate_files_truncated { + vec![format!( + "candidate_files truncated to {} entries", + self.max_candidate_files + )] + } else { + Vec::new() + }; + Ok(CleanupExplanation { + read_version, + stats: result.stats, + candidate_files: result.candidate_files, + candidate_files_truncated: result.candidate_files_truncated, + candidate_file_limit: self.max_candidate_files, + referenced_branches: result.referenced_branches, + warnings, + }) + } + + /// Execute cleanup by re-evaluating the current dataset state. + pub async fn execute(&self) -> Result { + info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.dataset.uri); + let cleanup = CleanupTask::new(self.dataset, self.policy.clone(), CleanupAction::Execute); + Ok(cleanup.run().await?.stats) + } +} impl<'a> CleanupTask<'a> { - fn new(dataset: &'a Dataset, policy: CleanupPolicy) -> Self { - Self { dataset, policy } + fn new(dataset: &'a Dataset, policy: CleanupPolicy, action: CleanupAction) -> Self { + let track_removed_manifests = policy.clean_referenced_branches; + let include_referenced_branches = action.candidate_file_limit().is_some(); + Self::new_with_ignored_manifests( + dataset, + policy, + action, + HashSet::new(), + track_removed_manifests, + include_referenced_branches, + ) + } + + fn new_with_ignored_manifests( + dataset: &'a Dataset, + policy: CleanupPolicy, + action: CleanupAction, + ignored_manifests: HashSet, + track_removed_manifests: bool, + include_referenced_branches: bool, + ) -> Self { + Self { + dataset, + policy, + action, + read_version: dataset.version().version, + ignored_manifests, + track_removed_manifests, + include_referenced_branches, + } } - async fn run(self) -> Result { - let mut final_stats = RemovalStats::default(); + async fn run(self) -> Result { + let mut final_result = CleanupRunResult::default(); + let candidate_file_limit = self.action.candidate_file_limit(); // First check if we need to clean referenced branches // For cases that referenced branches never clean and the current cleanup cannot clean anything // This must happen before cleaning the current branch if the setting is enabled. let referenced_branches: Vec<(String, u64)> = self.find_referenced_branches().await?; + if self.include_referenced_branches { + final_result.referenced_branches = referenced_branches + .iter() + .map(|(name, referenced_version)| CleanupReferencedBranch { + name: name.clone(), + referenced_version: *referenced_version, + cleanup_candidate: self.policy.clean_referenced_branches, + }) + .collect(); + } if self.policy.clean_referenced_branches { - self.clean_referenced_branches(&referenced_branches).await?; + final_result.merge( + self.clean_referenced_branches(&referenced_branches).await?, + candidate_file_limit, + ); } // we process all manifest files in parallel to figure @@ -179,19 +477,21 @@ impl<'a> CleanupTask<'a> { } if !referenced_branches.is_empty() { + let ignored_manifests: HashSet<_> = final_result + .removed_manifests + .union(&self.ignored_manifests) + .cloned() + .collect(); inspection = self - .retain_branch_lineage_files(inspection, &referenced_branches) + .retain_branch_lineage_files(inspection, &referenced_branches, &ignored_manifests) .await? }; - let stats = self.delete_unreferenced_files(inspection).await?; - final_stats.bytes_removed += stats.bytes_removed; - final_stats.old_versions += stats.old_versions; - final_stats.data_files_removed += stats.data_files_removed; - final_stats.transaction_files_removed += stats.transaction_files_removed; - final_stats.index_files_removed += stats.index_files_removed; - final_stats.deletion_files_removed += stats.deletion_files_removed; - Ok(final_stats) + final_result.merge( + self.delete_unreferenced_files(inspection).await?, + candidate_file_limit, + ); + Ok(final_result) } #[instrument(level = "debug", skip_all)] @@ -203,6 +503,7 @@ impl<'a> CleanupTask<'a> { self.dataset .commit_handler .list_manifest_locations(&self.dataset.base, &self.dataset.object_store, false) + .try_filter(|location| future::ready(!self.ignored_manifests.contains(&location.path))) .try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |location| { self.process_manifest_file(location, &inspection, tagged_versions) }) @@ -224,12 +525,10 @@ impl<'a> CleanupTask<'a> { let manifest = read_manifest(&self.dataset.object_store, &location.path, location.size).await?; - let dataset_version = self.dataset.version().version; - // Don't delete the latest version, even if it is old. Don't delete tagged versions, // regardless of age. Don't delete manifests if their version is newer than the dataset // version. These are either in-progress or newly added since we started. - let is_latest = dataset_version <= manifest.version; + let is_latest = self.read_version <= manifest.version; let is_tagged = tagged_versions.contains(&manifest.version); let in_working_set = is_latest || !self.policy.should_clean(&manifest) || is_tagged; let indexes = @@ -319,8 +618,10 @@ impl<'a> CleanupTask<'a> { async fn delete_unreferenced_files( &self, inspection: CleanupInspection, - ) -> Result { - let removal_stats = Mutex::new(RemovalStats::default()); + ) -> Result { + let cleanup_result = Mutex::new(CleanupRunResult::default()); + let deletes_files = self.action.deletes_files(); + let candidate_file_limit = self.action.candidate_file_limit(); let verification_threshold = utc_now() - TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days"); @@ -335,9 +636,8 @@ impl<'a> CleanupTask<'a> { ) }; // Build stream for a managed subtree - let build_listing_stream = |dir: Path, file_type: Option| { + let build_listing_stream = |dir: Path| { let inspection_ref = &inspection; - let removal_stats_ref = &removal_stats; self.dataset .object_store .read_dir_all(&dir, inspection.earliest_retained_manifest_time) @@ -356,118 +656,133 @@ impl<'a> CleanupTask<'a> { // delete it if we can verify it is part of an old version. let maybe_in_progress = !self.policy.delete_unverified && obj_meta.last_modified >= verification_threshold; - let path_to_remove = self.path_if_not_referenced( - obj_meta.location, + let file_to_remove = self.cleanup_file_if_not_referenced( + obj_meta, maybe_in_progress, inspection_ref, ); - if matches!(path_to_remove, Ok(Some(..))) { - let mut stats = removal_stats_ref.lock().unwrap(); - stats.bytes_removed += obj_meta.size; - if let Some(file_type) = file_type { - match file_type { - RemovedFileType::Data => stats.data_files_removed += 1, - RemovedFileType::Transaction => { - stats.transaction_files_removed += 1 - } - RemovedFileType::Index => stats.index_files_removed += 1, - RemovedFileType::Deletion => stats.deletion_files_removed += 1, - } - } - } - future::ready(path_to_remove) + future::ready(file_to_remove) }) .boxed() }; // Restrict scanning to Lance-managed subtrees for safety and performance. let streams = vec![ - build_listing_stream(self.dataset.versions_dir(), None), - build_listing_stream( - self.dataset.transactions_dir(), - Some(RemovedFileType::Transaction), - ), - build_listing_stream(self.dataset.data_dir(), Some(RemovedFileType::Data)), - build_listing_stream(self.dataset.indices_dir(), Some(RemovedFileType::Index)), - build_listing_stream( - self.dataset.deletions_dir(), - Some(RemovedFileType::Deletion), - ), + build_listing_stream(self.dataset.versions_dir()), + build_listing_stream(self.dataset.transactions_dir()), + build_listing_stream(self.dataset.data_dir()), + build_listing_stream(self.dataset.indices_dir()), + build_listing_stream(self.dataset.deletions_dir()), ]; - let unreferenced_paths = stream::iter(streams).flatten().boxed(); + let unreferenced_files = stream::iter(streams).flatten().boxed(); let old_manifests = inspection.old_manifests.clone(); - let num_old_manifests = old_manifests.len(); - - // Ideally this collect shouldn't be needed here but it seems necessary - // to avoid https://github.com/rust-lang/rust/issues/102211 - let manifest_bytes_removed = stream::iter(old_manifests.keys()) - .map(|path| self.dataset.object_store.size(path)) - .collect::>() - .await; - let manifest_bytes_removed = stream::iter(manifest_bytes_removed) - .buffer_unordered(self.dataset.object_store.io_parallelism()) - .try_fold(0, |acc, size| async move { Ok(acc + (size)) }) - .await; - - let old_manifests_stream = stream::iter(old_manifests.into_keys()) - .map(|path| { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = path.as_ref()); - Ok(path) + let manifest_files = stream::iter(old_manifests) + .map(|(path, _version)| async move { + let size_bytes = self.dataset.object_store.size(&path).await?; + Ok::(CleanupFile { + path, + kind: CleanupFileKind::Manifest, + unverified: false, + size_bytes, + }) }) + .buffer_unordered(self.dataset.object_store.io_parallelism()) .boxed(); - let all_paths_to_remove = - stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten(); - - let paths_to_delete: BoxStream> = if let Some(rate) = - self.policy.delete_rate_limit - { - let duration = calculate_duration(self.dataset.object_store.scheme().to_string(), rate); - let mut ticker = interval(duration); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - IntervalStream::new(ticker) - .zip(all_paths_to_remove) - .map(|(_, path)| path) - .boxed() - } else { - all_paths_to_remove.boxed() - }; - let delete_fut = self - .dataset - .object_store - .remove_stream(paths_to_delete) - .try_for_each(|_| future::ready(Ok(()))); + let all_files = stream::iter(vec![unreferenced_files, manifest_files]).flatten(); + let all_paths_to_remove = all_files.map(|file| { + let file = file?; + if deletes_files { + let mode = if file.unverified { + AUDIT_MODE_DELETE_UNVERIFIED + } else { + AUDIT_MODE_DELETE + }; + let path_str = file.path.as_ref(); + match file.kind { + CleanupFileKind::Manifest => { + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = path_str); + } + CleanupFileKind::Data => { + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_DATA, path = path_str); + } + CleanupFileKind::Deletion => { + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_DELETION, path = path_str); + } + CleanupFileKind::Index => { + info!(target: TRACE_FILE_AUDIT, mode=mode, r#type=AUDIT_TYPE_INDEX, path = path_str); + } + CleanupFileKind::Transaction | CleanupFileKind::TemporaryManifest => {} + } + } + cleanup_result + .lock() + .unwrap() + .record_file(&file, candidate_file_limit, self.track_removed_manifests); + Ok(file.path) + }); + + if deletes_files { + let paths_to_delete: BoxStream> = + if let Some(rate) = self.policy.delete_rate_limit { + let duration = + calculate_duration(self.dataset.object_store.scheme().to_string(), rate); + let mut ticker = interval(duration); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + IntervalStream::new(ticker) + .zip(all_paths_to_remove) + .map(|(_, path)| path) + .boxed() + } else { + all_paths_to_remove.boxed() + }; - delete_fut.await?; + self.dataset + .object_store + .remove_stream(paths_to_delete) + .try_for_each(|_| future::ready(Ok(()))) + .await?; + } else { + // Drain the stream to populate stats, but do not call remove_stream. + all_paths_to_remove + .try_for_each(|_| future::ready(Ok(()))) + .await?; + } - let mut removal_stats = removal_stats.into_inner().unwrap(); - removal_stats.old_versions = num_old_manifests as u64; - removal_stats.bytes_removed += manifest_bytes_removed?; + let cleanup_result = cleanup_result.into_inner().unwrap(); let span = Span::current(); - span.record("bytes_removed", removal_stats.bytes_removed); - span.record("data_files_removed", removal_stats.data_files_removed); + span.record("bytes_removed", cleanup_result.stats.bytes_removed); + span.record( + "data_files_removed", + cleanup_result.stats.data_files_removed, + ); span.record( "transaction_files_removed", - removal_stats.transaction_files_removed, + cleanup_result.stats.transaction_files_removed, + ); + span.record( + "index_files_removed", + cleanup_result.stats.index_files_removed, ); - span.record("index_files_removed", removal_stats.index_files_removed); span.record( "deletion_files_removed", - removal_stats.deletion_files_removed, + cleanup_result.stats.deletion_files_removed, ); - Ok(removal_stats) + Ok(cleanup_result) } - fn path_if_not_referenced( + fn cleanup_file_if_not_referenced( &self, - path: Path, + obj_meta: ObjectMeta, maybe_in_progress: bool, inspection: &CleanupInspection, - ) -> Result> { + ) -> Result> { + let path = obj_meta.location; let relative_path = remove_prefix(&path, &self.dataset.base); + let size_bytes = obj_meta.size; if relative_path.as_ref().starts_with("_versions/.tmp") { // This is a temporary manifest file. // @@ -476,7 +791,12 @@ impl<'a> CleanupTask<'a> { if maybe_in_progress { return Ok(None); } else { - return Ok(Some(path)); + return Ok(cleanup_file( + path, + CleanupFileKind::TemporaryManifest, + true, + size_bytes, + )); } } if relative_path.as_ref().starts_with("_indices") { @@ -490,15 +810,18 @@ impl<'a> CleanupTask<'a> { { return Ok(None); } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_INDEX, path = path.to_string()); - return Ok(Some(path)); + return Ok(cleanup_file(path, CleanupFileKind::Index, true, size_bytes)); } else if inspection .verified_files .index_uuids .contains(uuid.as_ref()) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_INDEX, path = path.to_string()); - return Ok(Some(path)); + return Ok(cleanup_file( + path, + CleanupFileKind::Index, + false, + size_bytes, + )); } } else { return Ok(None); @@ -514,15 +837,13 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file(path, CleanupFileKind::Data, true, size_bytes)) } else if inspection .verified_files .data_paths .contains(&relative_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file(path, CleanupFileKind::Data, false, size_bytes)) } else { Ok(None) } @@ -587,15 +908,13 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file(path, CleanupFileKind::Data, true, size_bytes)) } else if inspection .verified_files .data_paths .contains(&parent_data_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file(path, CleanupFileKind::Data, false, size_bytes)) } else { Ok(None) } @@ -613,15 +932,23 @@ impl<'a> CleanupTask<'a> { { Ok(None) } else if !maybe_in_progress { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file( + path, + CleanupFileKind::Deletion, + true, + size_bytes, + )) } else if inspection .verified_files .delete_paths .contains(&relative_path) { - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Ok(Some(path)) + Ok(cleanup_file( + path, + CleanupFileKind::Deletion, + false, + size_bytes, + )) } else { Ok(None) } @@ -640,7 +967,14 @@ impl<'a> CleanupTask<'a> { } else if !maybe_in_progress || inspection.verified_files.tx_paths.contains(&relative_path) { - Ok(Some(path)) + let unverified = + !inspection.verified_files.tx_paths.contains(&relative_path); + Ok(cleanup_file( + path, + CleanupFileKind::Transaction, + unverified, + size_bytes, + )) } else { Ok(None) } @@ -709,8 +1043,8 @@ impl<'a> CleanupTask<'a> { async fn clean_referenced_branches( &self, referenced_branches: &[(String, u64)], - ) -> Result { - let final_stats = Mutex::new(RemovalStats::default()); + ) -> Result { + let final_result = Mutex::new(CleanupRunResult::default()); // Group branches by their lineage identifier (BranchIdentifier). // Branches with the same identifier share a lineage and must be cleaned sequentially @@ -722,30 +1056,32 @@ impl<'a> CleanupTask<'a> { .or_insert_with(Vec::new) .push(branch.clone()); } + let action = self.action; + let candidate_file_limit = self.action.candidate_file_limit(); let tasks: Vec<_> = branches_chains .values() .map(|branch_chain| { - let final_stats = &final_stats; + let final_result = &final_result; async move { for branch in branch_chain { let branch_dataset = self .dataset .checkout_version((branch.as_str(), None)) .await?; - if let Some(stats) = cleanup_cascade_branch( + let ignored_manifests = + final_result.lock().unwrap().removed_manifests.clone(); + if let Some(result) = cleanup_cascade_branch_run( &branch_dataset, branch_dataset.manifest.as_ref(), + action, + ignored_manifests, ) .await? { - let mut stats_guard = final_stats.lock().unwrap(); - stats_guard.bytes_removed += stats.bytes_removed; - stats_guard.old_versions += stats.old_versions; - stats_guard.data_files_removed += stats.data_files_removed; - stats_guard.transaction_files_removed += - stats.transaction_files_removed; - stats_guard.index_files_removed += stats.index_files_removed; - stats_guard.deletion_files_removed += stats.deletion_files_removed; + final_result + .lock() + .unwrap() + .merge(result, candidate_file_limit); } } Ok::<(), Error>(()) @@ -753,7 +1089,7 @@ impl<'a> CleanupTask<'a> { }) .collect(); try_join_all(tasks).await?; - Ok(final_stats.into_inner().unwrap()) + Ok(final_result.into_inner().unwrap()) } // Retain manifests containing files referenced by descendant branches. @@ -762,6 +1098,7 @@ impl<'a> CleanupTask<'a> { &self, inspection: CleanupInspection, referenced_branches: &[(String, u64)], + removed_branch_manifests: &HashSet, ) -> Result { let inspection = Mutex::new(inspection); for (branch, root_version_number) in referenced_branches { @@ -772,6 +1109,9 @@ impl<'a> CleanupTask<'a> { self.dataset .commit_handler .list_manifest_locations(&branch_location.path, &self.dataset.object_store, false) + .try_filter(|location| { + future::ready(!removed_branch_manifests.contains(&location.path)) + }) .try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |location| { self.process_branch_referenced_manifests( location, @@ -1020,8 +1360,7 @@ pub async fn cleanup_old_versions( dataset: &Dataset, policy: CleanupPolicy, ) -> Result { - let cleanup = CleanupTask::new(dataset, policy); - cleanup.run().await + CleanupOperation::new(dataset, policy).execute().await } /// If the dataset config has `lance.auto_cleanup` parameters set, @@ -1048,11 +1387,35 @@ pub async fn cleanup_cascade_branch( dataset: &Dataset, manifest: &Manifest, ) -> Result> { + Ok( + cleanup_cascade_branch_run(dataset, manifest, CleanupAction::Execute, HashSet::new()) + .await? + .map(|result| result.stats), + ) +} + +async fn cleanup_cascade_branch_run( + dataset: &Dataset, + manifest: &Manifest, + action: CleanupAction, + ignored_manifests: HashSet, +) -> Result> { let policy = build_cleanup_policy(dataset, manifest).await?; if let Some(mut policy) = policy { policy.clean_referenced_branches = false; policy.error_if_tagged_old_versions = false; - Ok(Some(dataset.cleanup_with_policy(policy).await?)) + if action.deletes_files() { + info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&dataset.uri); + } + let cleanup = CleanupTask::new_with_ignored_manifests( + dataset, + policy, + action, + ignored_manifests, + true, + false, + ); + Ok(Some(cleanup.run().await?)) } else { Ok(None) } @@ -1443,6 +1806,14 @@ mod tests { cleanup_old_versions(&db, policy).await } + async fn explain_cleanup_with_policy( + &self, + policy: CleanupPolicy, + ) -> Result { + let db = self.open().await?; + db.cleanup(policy).explain().await + } + async fn run_cleanup_with_override( &self, before: DateTime, @@ -1670,6 +2041,51 @@ mod tests { assert_gt!(after_count.num_tx_files, 0); } + #[tokio::test] + async fn explain_cleanup_does_not_delete_files() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap()); + fixture.overwrite_some_data().await.unwrap(); + + let before_count = fixture.count_files().await.unwrap(); + let policy = CleanupPolicyBuilder::default() + .before_timestamp(utc_now()) + .build(); + + let explanation = fixture + .explain_cleanup_with_policy(policy.clone()) + .await + .unwrap(); + let after_preview_count = fixture.count_files().await.unwrap(); + + // Files are not actually removed when explaining cleanup. + assert_eq!(before_count, after_preview_count); + assert_eq!(explanation.read_version, 2); + assert_eq!(explanation.stats.old_versions, 1); + assert_eq!(explanation.stats.data_files_removed, 1); + assert_eq!(explanation.stats.transaction_files_removed, 1); + assert_gt!(explanation.stats.bytes_removed, 0); + assert!(!explanation.candidate_files.is_empty()); + assert!(!explanation.candidate_files_truncated); + + // Running cleanup with the same policy should remove the same files the + // explanation reported for this unchanged dataset. + let removed = fixture.run_cleanup_with_policy(policy).await.unwrap(); + let after_cleanup_count = fixture.count_files().await.unwrap(); + + assert_eq!( + removed.bytes_removed, + before_count.num_bytes - after_cleanup_count.num_bytes + ); + assert_eq!(removed.old_versions, explanation.stats.old_versions); + assert_eq!( + removed.data_files_removed, + explanation.stats.data_files_removed + ); + assert_eq!(removed.bytes_removed, explanation.stats.bytes_removed); + } + #[tokio::test] async fn cleanup_blob_v2_sidecar_files() { let fixture = MockDatasetFixture::try_new().unwrap(); @@ -3073,6 +3489,17 @@ mod tests { self.run_cleanup_inner(policy).await } + async fn explain_cleanup_with_referenced_branches(&mut self) -> Result { + let policy = CleanupPolicyBuilder::default() + .error_if_tagged_old_versions(false) + .clean_referenced_branches(true) + .retain_n_versions(&self.dataset, 1) + .await? + .build(); + self.dataset.checkout_latest().await?; + self.dataset.cleanup(policy).explain().await + } + async fn run_cleanup_inner(&mut self, policy: CleanupPolicy) -> Result { let pre_count = self.count_data().await?; self.dataset.checkout_latest().await?; @@ -3653,6 +4080,74 @@ mod tests { setup.assert_unchanged(&["branch4"]).await; } + #[tokio::test] + async fn explain_cleanup_with_referenced_branches_matches_cleanup() { + let mut setup = build_lineage_datasets().await.unwrap(); + + setup.enable_auto_cleanup().await.unwrap(); + setup.main.write_data().await.unwrap(); + setup.main.compact().await.unwrap(); + setup.branch4.compact().await.unwrap(); + setup.branch1.write_data().await.unwrap(); + setup.branch1.compact().await.unwrap(); + setup.branch2.write_data().await.unwrap(); + setup.branch2.compact().await.unwrap(); + setup.branch3.write_data().await.unwrap(); + setup.branch3.compact().await.unwrap(); + + setup.main.refresh().await.unwrap(); + setup.branch1.refresh().await.unwrap(); + setup.branch2.refresh().await.unwrap(); + setup.branch3.refresh().await.unwrap(); + setup.branch4.refresh().await.unwrap(); + let main_counts_before = setup.main.counts; + let branch1_counts_before = setup.branch1.counts; + let branch2_counts_before = setup.branch2.counts; + let branch3_counts_before = setup.branch3.counts; + let branch4_counts_before = setup.branch4.counts; + + let explanation = setup + .main + .explain_cleanup_with_referenced_branches() + .await + .unwrap(); + + setup.main.refresh().await.unwrap(); + setup.branch1.refresh().await.unwrap(); + setup.branch2.refresh().await.unwrap(); + setup.branch3.refresh().await.unwrap(); + setup.branch4.refresh().await.unwrap(); + assert_eq!(setup.main.counts, main_counts_before); + assert_eq!(setup.branch1.counts, branch1_counts_before); + assert_eq!(setup.branch2.counts, branch2_counts_before); + assert_eq!(setup.branch3.counts, branch3_counts_before); + assert_eq!(setup.branch4.counts, branch4_counts_before); + + let removed = setup + .main + .run_cleanup_with_referenced_branches() + .await + .unwrap(); + + assert!(!explanation.referenced_branches.is_empty()); + assert!( + explanation + .referenced_branches + .iter() + .any(|branch| branch.cleanup_candidate) + ); + assert_eq!(explanation.stats, removed); + setup.branch1.refresh().await.unwrap(); + setup.branch2.refresh().await.unwrap(); + setup.branch3.refresh().await.unwrap(); + setup.branch4.refresh().await.unwrap(); + assert_eq!(setup.main.counts.num_manifest_files, 1); + assert_eq!(setup.branch1.counts.num_manifest_files, 1); + assert_eq!(setup.branch2.counts.num_manifest_files, 1); + assert_eq!(setup.branch3.counts.num_manifest_files, 1); + assert_eq!(setup.branch4.counts.num_manifest_files, 1); + } + #[tokio::test] async fn auto_clean_referenced_branches_with_tags() { let mut setup = build_lineage_datasets().await.unwrap();