diff --git a/crates/core/dump/src/check.rs b/crates/core/dump/src/check.rs index 3e31c53be..c22ef349c 100644 --- a/crates/core/dump/src/check.rs +++ b/crates/core/dump/src/check.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeSet; use common::{catalog::physical::PhysicalTable, query_context::Error as QueryError}; use futures::TryStreamExt as _; @@ -7,26 +7,22 @@ use object_store::ObjectMeta; /// Validates consistency between metadata database and object store for a table /// -/// This function performs a comprehensive consistency check to ensure that the metadata -/// database and object store are in sync. It detects and attempts to repair consistency -/// issues where possible, or returns errors for corruption that requires manual intervention. +/// This function performs a consistency check to ensure that all files registered +/// in the metadata database exist in the object store. /// /// ## Consistency Checks Performed /// -/// 1. **Orphaned Files Detection** -/// - Verifies all files in object store are registered in metadata DB -/// - **Action on failure**: Automatically deletes orphaned files to restore consistency -/// - Orphaned files typically result from failed dump operations +/// **Missing Registered Files Detection** +/// - Verifies all metadata DB entries have corresponding files in object store +/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`] +/// - Indicates data corruption requiring manual intervention /// -/// 2. **Missing Registered Files Detection** -/// - Verifies all metadata DB entries have corresponding files in object store -/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`] -/// - Indicates data corruption requiring manual intervention +/// ## Note on Orphaned Files /// -/// ## Side Effects -/// -/// ⚠️ **Warning**: This function has side effects - it deletes orphaned files from object store. -/// These deletions are logged at `WARN` level before execution. +/// Files that exist in the object store but are not registered in the metadata DB +/// are intentionally NOT deleted by this function. These files may be pending +/// garbage collection via the `gc_manifest` table. The garbage collector handles +/// cleanup of such files after their expiration period. pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyError> { // See also: metadata-consistency @@ -45,7 +41,7 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE let store = table.object_store(); let path = table.path(); - let stored_files: BTreeMap = store + let stored_files: BTreeSet = store .list(Some(table.path())) .try_collect::>() .await @@ -54,29 +50,12 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE source: err, })? .into_iter() - .filter_map(|object| Some((object.location.filename()?.to_string(), object))) + .filter_map(|object| object.location.filename().map(|s| s.to_string())) .collect(); - // TODO: Move the orphaned files deletion logic out of this check function. This side effect - // should be handled somewhere else (e.g., by the garbage collector). - for (filename, object_meta) in &stored_files { - if !registered_files.contains(filename) { - // This file was written by a dump job, but it is not present in the metadata DB, - // so it is an orphaned file. Delete it. - tracing::warn!("Deleting orphaned file: {}", object_meta.location); - store.delete(&object_meta.location).await.map_err(|err| { - ConsistencyError::DeleteOrphanedFile { - location_id, - filename: filename.clone(), - source: err, - } - })?; - } - } - // Check for files in the metadata DB that do not exist in the store. for filename in registered_files { - if !stored_files.contains_key(&filename) { + if !stored_files.contains(&filename) { return Err(ConsistencyError::MissingRegisteredFile { location_id, filename, @@ -127,31 +106,6 @@ pub enum ConsistencyError { source: object_store::Error, }, - /// Failed to delete orphaned file from object store - /// - /// This occurs when attempting to clean up a file that exists in object store - /// but is not registered in metadata DB. The file is considered orphaned - /// (likely from a failed dump operation) and should be deleted to restore - /// consistency. - /// - /// Possible causes: - /// - Object store connectivity issues during delete - /// - File already deleted by concurrent process - /// - Permission/authentication issues - /// - Object store service unavailable - /// - /// This is a critical error - orphaned files indicate incomplete operations - /// and should be cleaned up to prevent storage bloat. - #[error( - "Failed to delete orphaned file '{filename}' from object store for table location {location_id}" - )] - DeleteOrphanedFile { - location_id: LocationId, - filename: String, - #[source] - source: object_store::Error, - }, - /// Registered file missing from object store (data corruption) /// /// This occurs when a file is registered in the metadata database but does diff --git a/crates/core/dump/src/compaction/collector.rs b/crates/core/dump/src/compaction/collector.rs index 79592f332..be2eefbf7 100644 --- a/crates/core/dump/src/compaction/collector.rs +++ b/crates/core/dump/src/compaction/collector.rs @@ -104,17 +104,20 @@ impl Collector { ); } - let paths_to_remove = metadata_db - .delete_file_ids(found_file_ids_to_paths.keys()) + // Delete from footer_cache (file_metadata was already deleted during compaction) + metadata_db + .delete_footer_cache(found_file_ids_to_paths.keys()) .await - .map_err(CollectorError::file_metadata_delete( + .map_err(CollectorError::footer_cache_delete( found_file_ids_to_paths.keys(), - ))? - .into_iter() - .filter_map(|file_id| found_file_ids_to_paths.get(&file_id).cloned()) - .collect::>(); + ))?; - tracing::debug!("Metadata entries deleted: {}", paths_to_remove.len()); + tracing::debug!( + "Footer cache entries deleted: {}", + found_file_ids_to_paths.len() + ); + + let paths_to_remove: BTreeSet<_> = found_file_ids_to_paths.values().cloned().collect(); if let Some(metrics) = &self.metrics { metrics.inc_expired_entries_deleted( @@ -161,6 +164,19 @@ impl Collector { tracing::debug!("Expired files deleted: {}", files_deleted); tracing::debug!("Expired files not found: {}", files_not_found); + // Delete from gc_manifest after physical files have been deleted + metadata_db + .delete_gc_manifest(found_file_ids_to_paths.keys()) + .await + .map_err(CollectorError::gc_manifest_delete( + found_file_ids_to_paths.keys(), + ))?; + + tracing::debug!( + "GC manifest entries deleted: {}", + found_file_ids_to_paths.len() + ); + if let Some(metrics) = self.metrics.as_ref() { metrics.inc_successful_collections(table_name.to_string()); } diff --git a/crates/core/dump/src/compaction/error.rs b/crates/core/dump/src/compaction/error.rs index 9fb9dea11..4f624bfe0 100644 --- a/crates/core/dump/src/compaction/error.rs +++ b/crates/core/dump/src/compaction/error.rs @@ -309,11 +309,31 @@ pub enum CollectorError { file_ids: Vec, }, - /// Failed to delete manifest record + /// Failed to delete footer cache records /// - /// This occurs when removing a file entry from the garbage collection manifest - /// after the file has been successfully deleted. The GC manifest tracks files - /// eligible for cleanup. + /// This occurs when removing footer cache entries from the database after + /// the gc_manifest entries have expired. Footer cache entries are kept until + /// the Collector runs to allow reads during the grace period. + /// + /// Common causes: + /// - Database connection lost during delete + /// - Transaction conflicts with concurrent operations + /// - Insufficient database permissions for DELETE queries + /// + /// This leaves orphaned footer cache entries. They should be cleaned up + /// manually or via retry. + #[error("failed to delete footer cache for files {file_ids:?}: {err}")] + FooterCacheDelete { + #[source] + err: metadata_db::Error, + file_ids: Vec, + }, + + /// Failed to delete gc_manifest records + /// + /// This occurs when removing entries from the garbage collection manifest + /// after the files have been successfully deleted from storage. The GC manifest + /// tracks files eligible for cleanup. /// /// Common causes: /// - Database connection lost during delete @@ -321,13 +341,13 @@ pub enum CollectorError { /// - Record already deleted by another process /// - Insufficient database permissions /// - /// This leaves a stale entry in the GC manifest. The entry should be cleaned + /// This leaves stale entries in the GC manifest. They should be cleaned /// up to prevent repeated deletion attempts. - #[error("failed to delete file {file_id} from gc manifest: {err}")] - ManifestDelete { + #[error("failed to delete from gc manifest for files {file_ids:?}: {err}")] + GcManifestDelete { #[source] err: metadata_db::Error, - file_id: FileId, + file_ids: Vec, }, /// Failed to parse URL for file @@ -408,8 +428,22 @@ impl CollectorError { } } - pub fn gc_manifest_delete(file_id: FileId) -> impl FnOnce(metadata_db::Error) -> Self { - move |err| Self::ManifestDelete { err, file_id } + pub fn footer_cache_delete<'a>( + file_ids: impl Iterator, + ) -> impl FnOnce(metadata_db::Error) -> Self { + move |err| Self::FooterCacheDelete { + err, + file_ids: file_ids.cloned().collect(), + } + } + + pub fn gc_manifest_delete<'a>( + file_ids: impl Iterator, + ) -> impl FnOnce(metadata_db::Error) -> Self { + move |err| Self::GcManifestDelete { + err, + file_ids: file_ids.cloned().collect(), + } } pub fn parse_error(file_id: FileId) -> impl FnOnce(url::ParseError) -> Self { diff --git a/crates/core/metadata-db/Cargo.toml b/crates/core/metadata-db/Cargo.toml index b5b2a0741..183ea9cf1 100644 --- a/crates/core/metadata-db/Cargo.toml +++ b/crates/core/metadata-db/Cargo.toml @@ -9,6 +9,8 @@ license-file.workspace = true default = [] # Temporary database support for testing environments temp-db = ["pgtemp"] +# Test utilities for downstream crates +test-utils = [] [dependencies] backon.workspace = true diff --git a/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql b/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql new file mode 100644 index 000000000..6bf1a0ce2 --- /dev/null +++ b/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql @@ -0,0 +1,16 @@ +-- Drop the footer column from file_metadata +ALTER TABLE file_metadata DROP COLUMN footer; + +-- Remove the foreign key constraint from footer_cache entirely +-- This allows us to delete from file_metadata eagerly during compaction +-- while keeping footer_cache entries until the Collector runs +ALTER TABLE footer_cache DROP CONSTRAINT footer_cache_file_id_fkey; + +-- Remove the foreign key constraint from gc_manifest to file_metadata +-- This allows us to delete from file_metadata while keeping gc_manifest entries +-- for garbage collection tracking +ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_file_id_fkey; + +-- Remove the CHECK constraint on gc_manifest.expiration +-- This allows inserting entries with any expiration time +ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_expiration_check; diff --git a/crates/core/metadata-db/src/files.rs b/crates/core/metadata-db/src/files.rs index 664dd9996..2e069b4c3 100644 --- a/crates/core/metadata-db/src/files.rs +++ b/crates/core/metadata-db/src/files.rs @@ -37,8 +37,8 @@ where { let query = indoc::indoc! {r#" WITH inserted AS ( - INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata, footer) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING RETURNING id ) @@ -135,12 +135,20 @@ where } /// Delete file metadata record by ID +/// +/// Also deletes the corresponding footer_cache entry. #[instrument(skip(executor))] pub async fn delete<'e, E>(executor: E, id: FileId) -> Result where E: sqlx::Executor<'e, Database = sqlx::Postgres>, { - let query = "DELETE FROM file_metadata WHERE id = $1"; + // Delete from footer_cache first (no FK constraint), then file_metadata + let query = indoc::indoc! {r#" + WITH deleted_footer AS ( + DELETE FROM footer_cache WHERE file_id = $1 + ) + DELETE FROM file_metadata WHERE id = $1 + "#}; let result = sqlx::query(query).bind(id).execute(executor).await?; Ok(result.rows_affected() > 0) diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 6190510db..c36509e32 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -302,6 +302,7 @@ impl MetadataDb { /// Delete file metadata record by ID /// + /// Also deletes the corresponding footer_cache entry. /// Returns `true` if a file was deleted, `false` if no file was found. pub async fn delete_file(&self, file_id: FileId) -> Result { files::delete(&*self.pool, file_id) @@ -356,10 +357,12 @@ impl MetadataDb { .map_err(Error::Database) } - /// Inserts or updates the GC manifest for the given file IDs. - /// If a file ID already exists, it updates the expiration time. + /// Deletes file metadata entries and inserts them into the GC manifest. + /// + /// This eagerly removes files from file_metadata so they won't be listed, + /// while keeping footer_cache entries until the Collector runs. /// The expiration time is set to the current time plus the given duration. - /// If the file ID does not exist, it inserts a new row. + /// If the file ID already exists in gc_manifest, it updates the expiration time. pub async fn upsert_gc_manifest( &self, location_id: LocationId, @@ -371,16 +374,20 @@ impl MetadataDb { ..Default::default() }; - let sql = " + let sql = indoc::indoc! {r#" + WITH deleted_files AS ( + DELETE FROM file_metadata + WHERE id = ANY($2) + RETURNING id, file_name + ) INSERT INTO gc_manifest (location_id, file_id, file_path, expiration) SELECT $1 - , file.id - , file_metadata.file_name + , deleted_files.id + , deleted_files.file_name , CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + $3 - FROM UNNEST ($2) AS file(id) - INNER JOIN file_metadata ON file_metadata.id = file.id - ON CONFLICT (file_id) DO UPDATE SET expiration = EXCLUDED.expiration; - "; + FROM deleted_files + ON CONFLICT (file_id) DO UPDATE SET expiration = EXCLUDED.expiration + "#}; sqlx::query(sql) .bind(location_id) .bind(file_ids.iter().map(|id| **id).collect::>()) @@ -412,6 +419,69 @@ impl MetadataDb { .map_err(Error::Database) .boxed() } + + /// Deletes footer cache entries for the given file IDs. + /// + /// Called by the Collector when gc_manifest entries have expired + /// and physical files are ready to be deleted. + pub async fn delete_footer_cache( + &self, + file_ids: impl Iterator, + ) -> Result<(), Error> { + let sql = "DELETE FROM footer_cache WHERE file_id = ANY($1)"; + + sqlx::query(sql) + .bind(file_ids.map(|id| **id).collect::>()) + .execute(&*self.pool) + .await + .map_err(Error::Database)?; + + Ok(()) + } + + /// Deletes gc_manifest entries for the given file IDs. + /// + /// Called by the Collector after physical files have been deleted. + pub async fn delete_gc_manifest( + &self, + file_ids: impl Iterator, + ) -> Result<(), Error> { + let sql = "DELETE FROM gc_manifest WHERE file_id = ANY($1)"; + + sqlx::query(sql) + .bind(file_ids.map(|id| **id).collect::>()) + .execute(&*self.pool) + .await + .map_err(Error::Database)?; + + Ok(()) + } + + /// Counts footer_cache entries for a location. + /// + /// This counts footer_cache entries that belong to files associated with + /// this location, whether they are still in file_metadata (current files) + /// or in gc_manifest (files pending garbage collection). + #[cfg(feature = "test-utils")] + pub async fn count_footer_cache_by_location( + &self, + location_id: LocationId, + ) -> Result { + let sql = indoc::indoc! {r#" + SELECT COUNT(DISTINCT fc.file_id) FROM footer_cache fc + WHERE fc.file_id IN ( + SELECT id FROM file_metadata WHERE location_id = $1 + UNION + SELECT file_id FROM gc_manifest WHERE location_id = $1 + ) + "#}; + + sqlx::query_scalar(sql) + .bind(location_id) + .fetch_one(&*self.pool) + .await + .map_err(Error::Database) + } } /// Private module for sealed trait pattern diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 91492cf86..29d86fed4 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -180,12 +180,14 @@ where /// Delete a physical table location by its ID /// -/// This will also delete all associated file_metadata entries due to CASCADE constraints. +/// This will also delete all associated entries across multiple tables. /// /// # Cascade Effects /// /// Deleting a location will also delete: -/// - All file_metadata entries associated with this location +/// - All file_metadata entries associated with this location (CASCADE) +/// - All gc_manifest entries for those files (CASCADE from file_metadata) +/// - All footer_cache entries for those files (explicit delete, no FK constraint) #[tracing::instrument(skip(exe), err)] pub async fn delete_by_id<'c, E>( exe: E, diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index e6a1b1a47..edc664d8b 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -273,13 +273,25 @@ where /// Delete a location by its ID /// -/// This will also delete all associated file_metadata entries due to CASCADE. +/// This will also delete all associated entries: +/// - file_metadata entries (via CASCADE on location_id FK) +/// - gc_manifest entries (via CASCADE on file_id FK from file_metadata) +/// - footer_cache entries (explicitly deleted since no FK constraint) +/// /// Returns true if the location was deleted, false if it didn't exist. pub async fn delete_by_id<'c, E>(exe: E, id: LocationId) -> Result where E: Executor<'c, Database = Postgres>, { + // Delete footer_cache entries first (no FK constraint), then physical_tables + // The physical_tables delete will cascade to file_metadata and gc_manifest let query = indoc::indoc! {" + WITH file_ids AS ( + SELECT id FROM file_metadata WHERE location_id = $1 + ), + deleted_footer_cache AS ( + DELETE FROM footer_cache WHERE file_id IN (SELECT id FROM file_ids) + ) DELETE FROM physical_tables WHERE id = $1 "}; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 2202fb58a..65cb52c85 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -30,7 +30,7 @@ firehose-datasets = { path = "../crates/extractors/firehose" } fs-err.workspace = true futures.workspace = true indoc.workspace = true -metadata-db = { path = "../crates/core/metadata-db" } +metadata-db = { path = "../crates/core/metadata-db", features = ["test-utils"] } monitoring = { path = "../crates/core/monitoring" } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["testing"] } diff --git a/tests/src/tests/it_sql_dataset_batch_size.rs b/tests/src/tests/it_sql_dataset_batch_size.rs index 76d77e6bc..2c09f2014 100644 --- a/tests/src/tests/it_sql_dataset_batch_size.rs +++ b/tests/src/tests/it_sql_dataset_batch_size.rs @@ -40,29 +40,74 @@ async fn sql_dataset_input_batch_size() { .await; // 4. Get catalog and count files - let file_count = test.file_count("sql_stream_ds", "even_blocks").await; + let physical_file_count = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; // 5. With batch size 1 and 4 blocks, we expect 4 files to be dumped (even if some are empty) // since microbatch_max_interval=1 should create one file per block even_blocks only includes // even block numbers, so we expect 2 files with data for blocks 15000000 and 15000002, plus // empty files for odd blocks. - assert_eq!(file_count, 4); + assert_eq!(physical_file_count, 4); + assert_eq!(file_metadata_count, 4); test.spawn_compaction_and_await_completion("sql_stream_ds", "even_blocks") .await; - // 6. After compaction, we expect an additional file to be created, with all data in it. - let file_count_after = test.file_count("sql_stream_ds", "even_blocks").await; + // 6. After compaction: + // - file_metadata: only 1 entry (the compacted file) since old files are eagerly deleted + // - physical files: 5 (4 original + 1 compacted, waiting for garbage collection) + // - footer_cache: 5 (preserved for all files until garbage collection) + let physical_file_count_after = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count_after = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; + let footer_cache_count_after = test + .footer_cache_count("sql_stream_ds", "even_blocks") + .await; - assert_eq!(file_count_after, 5); + assert_eq!( + file_metadata_count_after, 1, + "file_metadata should only have compacted file" + ); + assert_eq!( + physical_file_count_after, 5, + "physical files should include old + compacted" + ); + assert_eq!( + footer_cache_count_after, physical_file_count_after, + "footer_cache should match physical files" + ); test.spawn_collection_and_await_completion("sql_stream_ds", "even_blocks") .await; - // 7. After collection, we expect the original 4 files to be deleted, - // leaving only the compacted file. - let file_count_final = test.file_count("sql_stream_ds", "even_blocks").await; - assert_eq!(file_count_final, 1); + // 7. After collection, garbage collector deletes: + // - physical files (4 old ones) + // - footer_cache entries (4 old ones) + // - gc_manifest entries (4 old ones) + // Leaving only the compacted file everywhere. + let physical_file_count_final = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count_final = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; + let footer_cache_count_final = test + .footer_cache_count("sql_stream_ds", "even_blocks") + .await; + + assert_eq!( + physical_file_count_final, 1, + "only compacted file should remain" + ); + assert_eq!( + file_metadata_count_final, 1, + "only compacted file in metadata" + ); + assert_eq!( + footer_cache_count_final, 1, + "only compacted file in footer_cache" + ); let mut test_client = test.new_flight_client().await.unwrap(); let (res, _batch_count) = test_client @@ -233,4 +278,38 @@ impl TestCtx { async fn file_count(&self, dataset: &str, table: &str) -> usize { self.files(dataset, table).await.len() } + + /// Count file_metadata entries for a table (registered files) + async fn file_metadata_count(&self, dataset: &str, table: &str) -> usize { + let catalog = self.catalog_for_dataset(dataset).await.unwrap(); + let table = catalog + .tables() + .iter() + .find(|t| t.table_name() == table) + .unwrap(); + + table.files().await.map(|files| files.len()).unwrap_or(0) + } + + /// Count footer_cache entries for a table + /// + /// This counts footer_cache entries that belong to this location via either: + /// - file_metadata (current files) + /// - gc_manifest (files pending garbage collection) + async fn footer_cache_count(&self, dataset: &str, table: &str) -> usize { + let catalog = self.catalog_for_dataset(dataset).await.unwrap(); + let table = catalog + .tables() + .iter() + .find(|t| t.table_name() == table) + .unwrap(); + + let location_id = table.location_id(); + let metadata_db = self.ctx.daemon_worker().metadata_db(); + + metadata_db + .count_footer_cache_by_location(location_id) + .await + .unwrap_or(0) as usize + } }