From 3ad7aa26ff4568ce1af3055d7b9bc63f2d708087 Mon Sep 17 00:00:00 2001 From: John Swanson Date: Thu, 4 Dec 2025 11:42:27 -0800 Subject: [PATCH 1/5] feat(metadata-db): add footer_cache table and denormalize url into file_metadata Migrations: - Add footer_cache table to store footer bytes separately from file_metadata - Add url column to file_metadata, denormalized from physical_tables metadata-db: - Update insert to use CTE that writes to both file_metadata and footer_cache - Add url parameter to insert/register_file functions - Update get_footer_bytes_by_id to read from footer_cache instead of file_metadata - Update select queries to use denormalized fm.url instead of joining physical_tables dump: - Add url field to ParquetFileWriterOutput struct - Add url parameter to commit_metadata function - Update all call sites in raw_dataset_writer, derived_dataset, and compactor common: - Update register_file call in physical.rs to pass url parameter Signed-off-by: John Swanson --- crates/core/common/src/catalog/physical.rs | 1 + crates/core/dump/src/compaction/compactor.rs | 1 + crates/core/dump/src/derived_dataset.rs | 2 ++ crates/core/dump/src/parquet_writer.rs | 5 +++++ crates/core/dump/src/raw_dataset_writer.rs | 4 ++++ .../20251204183330_footer_cache.sql | 10 +++++++++ ...251204183842_denormalize_file_metadata.sql | 12 ++++++++++ crates/core/metadata-db/src/files.rs | 22 ++++++++++++------- crates/core/metadata-db/src/lib.rs | 4 ++++ 9 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 crates/core/metadata-db/migrations/20251204183330_footer_cache.sql create mode 100644 crates/core/metadata-db/migrations/20251204183842_denormalize_file_metadata.sql diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index ca2cb2dfb..309f71e9a 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -460,6 +460,7 @@ impl PhysicalTable { metadata_db .register_file( location_id, + url, file_name, object_size, object_e_tag, diff --git a/crates/core/dump/src/compaction/compactor.rs b/crates/core/dump/src/compaction/compactor.rs index e1c957ba2..8654f7200 100644 --- a/crates/core/dump/src/compaction/compactor.rs +++ b/crates/core/dump/src/compaction/compactor.rs @@ -334,6 +334,7 @@ impl ParquetFileWriterOutput { metadata_db .register_file( location_id, + &self.url, file_name, object_size, object_e_tag, diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index 0b0e98380..ab4c73f76 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -526,6 +526,7 @@ async fn dump_sql_query( parquet_meta, object_meta, footer, + url, .. } = writer.close(range, vec![], Generation::default()).await?; @@ -534,6 +535,7 @@ async fn dump_sql_query( parquet_meta, object_meta, physical_table.location_id(), + &url, footer, ) .await?; diff --git a/crates/core/dump/src/parquet_writer.rs b/crates/core/dump/src/parquet_writer.rs index 4335d9f22..eb54a9437 100644 --- a/crates/core/dump/src/parquet_writer.rs +++ b/crates/core/dump/src/parquet_writer.rs @@ -31,6 +31,7 @@ pub async fn commit_metadata( .. }: ObjectMeta, location_id: LocationId, + url: &Url, footer: FooterBytes, ) -> Result<(), BoxError> { let file_name = parquet_meta.filename.clone(); @@ -38,6 +39,7 @@ pub async fn commit_metadata( metadata_db .register_file( location_id, + url, file_name, object_size, object_e_tag, @@ -156,11 +158,13 @@ impl ParquetFileWriter { extract_footer_bytes_from_file(&object_meta, self.table.object_store()).await?; let location_id = self.table.location_id(); + let url = self.table.url().clone(); Ok(ParquetFileWriterOutput { parquet_meta, object_meta, location_id, + url, parent_ids, footer, }) @@ -177,6 +181,7 @@ pub struct ParquetFileWriterOutput { pub(crate) parquet_meta: ParquetMeta, pub(crate) object_meta: ObjectMeta, pub(crate) location_id: LocationId, + pub(crate) url: Url, pub(crate) parent_ids: Vec, pub(crate) footer: FooterBytes, } diff --git a/crates/core/dump/src/raw_dataset_writer.rs b/crates/core/dump/src/raw_dataset_writer.rs index def524791..54094b0bd 100644 --- a/crates/core/dump/src/raw_dataset_writer.rs +++ b/crates/core/dump/src/raw_dataset_writer.rs @@ -64,6 +64,7 @@ impl RawDatasetWriter { parquet_meta, object_meta, footer, + url, .. }) = writer.write(table_rows).await? { @@ -73,6 +74,7 @@ impl RawDatasetWriter { parquet_meta, object_meta, location_id, + &url, footer, ) .await?; @@ -89,6 +91,7 @@ impl RawDatasetWriter { parquet_meta, object_meta, footer, + url, .. }) = writer.close().await? { @@ -97,6 +100,7 @@ impl RawDatasetWriter { parquet_meta, object_meta, location_id, + &url, footer, ) .await? diff --git a/crates/core/metadata-db/migrations/20251204183330_footer_cache.sql b/crates/core/metadata-db/migrations/20251204183330_footer_cache.sql new file mode 100644 index 000000000..85749eab0 --- /dev/null +++ b/crates/core/metadata-db/migrations/20251204183330_footer_cache.sql @@ -0,0 +1,10 @@ + +CREATE TABLE IF NOT EXISTS footer_cache ( + file_id BIGINT REFERENCES file_metadata(id) ON DELETE CASCADE NOT NULL, + footer BYTEA NOT NULL, + PRIMARY KEY (file_id) +); + +INSERT INTO footer_cache (file_id, footer) +SELECT id, footer FROM file_metadata +WHERE footer IS NOT NULL; \ No newline at end of file diff --git a/crates/core/metadata-db/migrations/20251204183842_denormalize_file_metadata.sql b/crates/core/metadata-db/migrations/20251204183842_denormalize_file_metadata.sql new file mode 100644 index 000000000..933fe5295 --- /dev/null +++ b/crates/core/metadata-db/migrations/20251204183842_denormalize_file_metadata.sql @@ -0,0 +1,12 @@ +-- Add url column to file_metadata table +ALTER TABLE file_metadata ADD COLUMN url TEXT; + +-- Update url column based on url in physical_tables where location_id = physical_tables.id +UPDATE file_metadata +SET url = pt.url +FROM physical_tables pt +WHERE file_metadata.location_id = pt.id; + +-- Update url column as NOT NULL +DELETE FROM file_metadata WHERE url IS NULL; +ALTER TABLE file_metadata ALTER COLUMN url SET NOT NULL; diff --git a/crates/core/metadata-db/src/files.rs b/crates/core/metadata-db/src/files.rs index 81d2989b1..664dd9996 100644 --- a/crates/core/metadata-db/src/files.rs +++ b/crates/core/metadata-db/src/files.rs @@ -18,11 +18,13 @@ pub type FooterBytes = Vec; /// Insert new file metadata record /// /// Creates a new file metadata entry. Uses ON CONFLICT DO NOTHING for idempotency. +/// Also inserts the footer into the footer_cache table. #[instrument(skip(executor, footer))] #[expect(clippy::too_many_arguments)] pub async fn insert<'e, E>( executor: E, location_id: LocationId, + url: &Url, file_name: String, object_size: u64, object_e_tag: Option, @@ -34,13 +36,19 @@ where E: sqlx::Executor<'e, Database = sqlx::Postgres>, { let query = indoc::indoc! {r#" - INSERT INTO file_metadata (location_id, file_name, object_size, object_e_tag, object_version, metadata, footer) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT DO NOTHING + WITH inserted AS ( + INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata, footer) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING + RETURNING id + ) + INSERT INTO footer_cache (file_id, footer) + SELECT id, $8 FROM inserted "#}; sqlx::query(query) .bind(location_id) + .bind(url.as_str()) .bind(file_name) .bind(object_size as i64) .bind(object_e_tag) @@ -70,13 +78,12 @@ where SELECT fm.id, fm.location_id, fm.file_name, - l.url, + fm.url, fm.object_size, fm.object_e_tag, fm.object_version, fm.metadata FROM file_metadata fm - JOIN physical_tables l ON fm.location_id = l.id WHERE fm.id = $1 "#}; @@ -101,13 +108,12 @@ where SELECT fm.id, fm.location_id, fm.file_name, - l.url, + fm.url, fm.object_size, fm.object_e_tag, fm.object_version, fm.metadata FROM file_metadata fm - JOIN physical_tables l ON fm.location_id = l.id WHERE location_id = $1 "#}; @@ -123,7 +129,7 @@ pub async fn get_footer_bytes_by_id<'e, E>( where E: sqlx::Executor<'e, Database = sqlx::Postgres>, { - let query = "SELECT footer FROM file_metadata WHERE id = $1"; + let query = "SELECT footer FROM footer_cache WHERE file_id = $1"; sqlx::query_scalar(query).bind(id).fetch_one(executor).await } diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 44b80d0ba..8a53a6382 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -7,6 +7,7 @@ use futures::{ }; use sqlx::{postgres::types::PgInterval, types::chrono::NaiveDateTime}; use tracing::instrument; +use url::Url; pub mod datasets; mod db; @@ -211,10 +212,12 @@ impl MetadataDb { /// /// Creates a new file metadata entry with the provided information. Uses /// ON CONFLICT DO NOTHING to make the operation idempotent. + /// Also inserts the footer into the footer_cache table. #[expect(clippy::too_many_arguments)] pub async fn register_file( &self, location_id: LocationId, + url: &Url, file_name: String, object_size: u64, object_e_tag: Option, @@ -225,6 +228,7 @@ impl MetadataDb { files::insert( &*self.pool, location_id, + url, file_name, object_size, object_e_tag, From b3a77ec32b78fdea1de6199837cab351b51d4d03 Mon Sep 17 00:00:00 2001 From: John Swanson Date: Thu, 4 Dec 2025 13:47:46 -0800 Subject: [PATCH 2/5] feat(metadata-db): eager file_metadata deletion during compaction Introduces a new garbage collection pattern where file_metadata entries are eagerly deleted during compaction while preserving footer_cache entries until the Collector runs. This allows immediate query visibility changes while maintaining footer data for in-flight reads. **Migration changes:** - Drop `footer` column from `file_metadata` (now only in `footer_cache`) - Remove FK constraint from `footer_cache` to `file_metadata` - Remove FK constraint from `gc_manifest` to `file_metadata` - Remove CHECK constraint on `gc_manifest.expiration` **Compaction flow:** - `upsert_gc_manifest` now deletes from `file_metadata` via CTE and inserts into `gc_manifest` - File metadata disappears from queries immediately after compaction - Footer cache entries preserved for reads during grace period **Collector changes:** - No longer deletes from `file_metadata` (already done during compaction) - Now deletes from `footer_cache` after gc_manifest entries expire - Now deletes from `gc_manifest` after physical files are removed **Consistency check:** - Remove orphan file deletion logic (now handled by garbage collector) - Orphaned files may be pending GC via `gc_manifest` **Test utilities:** - Add `test-utils` feature flag to `metadata-db` - Add `count_footer_cache_by_location` method for test assertions - Update `sql_dataset_input_batch_size` test to verify new behavior Signed-off-by: John Swanson --- crates/core/dump/src/check.rs | 76 +++------------ crates/core/dump/src/compaction/collector.rs | 32 ++++-- crates/core/dump/src/compaction/error.rs | 54 +++++++++-- crates/core/metadata-db/Cargo.toml | 2 + ...195315_remove_file_metadata_footer_col.sql | 16 +++ crates/core/metadata-db/src/files.rs | 14 ++- crates/core/metadata-db/src/lib.rs | 90 +++++++++++++++-- crates/core/metadata-db/src/physical_table.rs | 6 +- .../metadata-db/src/physical_table/sql.rs | 14 ++- tests/Cargo.toml | 2 +- tests/src/tests/it_sql_dataset_batch_size.rs | 97 +++++++++++++++++-- 11 files changed, 298 insertions(+), 105 deletions(-) create mode 100644 crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql diff --git a/crates/core/dump/src/check.rs b/crates/core/dump/src/check.rs index 3e31c53be..c22ef349c 100644 --- a/crates/core/dump/src/check.rs +++ b/crates/core/dump/src/check.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeSet; use common::{catalog::physical::PhysicalTable, query_context::Error as QueryError}; use futures::TryStreamExt as _; @@ -7,26 +7,22 @@ use object_store::ObjectMeta; /// Validates consistency between metadata database and object store for a table /// -/// This function performs a comprehensive consistency check to ensure that the metadata -/// database and object store are in sync. It detects and attempts to repair consistency -/// issues where possible, or returns errors for corruption that requires manual intervention. +/// This function performs a consistency check to ensure that all files registered +/// in the metadata database exist in the object store. /// /// ## Consistency Checks Performed /// -/// 1. **Orphaned Files Detection** -/// - Verifies all files in object store are registered in metadata DB -/// - **Action on failure**: Automatically deletes orphaned files to restore consistency -/// - Orphaned files typically result from failed dump operations +/// **Missing Registered Files Detection** +/// - Verifies all metadata DB entries have corresponding files in object store +/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`] +/// - Indicates data corruption requiring manual intervention /// -/// 2. **Missing Registered Files Detection** -/// - Verifies all metadata DB entries have corresponding files in object store -/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`] -/// - Indicates data corruption requiring manual intervention +/// ## Note on Orphaned Files /// -/// ## Side Effects -/// -/// ⚠️ **Warning**: This function has side effects - it deletes orphaned files from object store. -/// These deletions are logged at `WARN` level before execution. +/// Files that exist in the object store but are not registered in the metadata DB +/// are intentionally NOT deleted by this function. These files may be pending +/// garbage collection via the `gc_manifest` table. The garbage collector handles +/// cleanup of such files after their expiration period. pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyError> { // See also: metadata-consistency @@ -45,7 +41,7 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE let store = table.object_store(); let path = table.path(); - let stored_files: BTreeMap = store + let stored_files: BTreeSet = store .list(Some(table.path())) .try_collect::>() .await @@ -54,29 +50,12 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE source: err, })? .into_iter() - .filter_map(|object| Some((object.location.filename()?.to_string(), object))) + .filter_map(|object| object.location.filename().map(|s| s.to_string())) .collect(); - // TODO: Move the orphaned files deletion logic out of this check function. This side effect - // should be handled somewhere else (e.g., by the garbage collector). - for (filename, object_meta) in &stored_files { - if !registered_files.contains(filename) { - // This file was written by a dump job, but it is not present in the metadata DB, - // so it is an orphaned file. Delete it. - tracing::warn!("Deleting orphaned file: {}", object_meta.location); - store.delete(&object_meta.location).await.map_err(|err| { - ConsistencyError::DeleteOrphanedFile { - location_id, - filename: filename.clone(), - source: err, - } - })?; - } - } - // Check for files in the metadata DB that do not exist in the store. for filename in registered_files { - if !stored_files.contains_key(&filename) { + if !stored_files.contains(&filename) { return Err(ConsistencyError::MissingRegisteredFile { location_id, filename, @@ -127,31 +106,6 @@ pub enum ConsistencyError { source: object_store::Error, }, - /// Failed to delete orphaned file from object store - /// - /// This occurs when attempting to clean up a file that exists in object store - /// but is not registered in metadata DB. The file is considered orphaned - /// (likely from a failed dump operation) and should be deleted to restore - /// consistency. - /// - /// Possible causes: - /// - Object store connectivity issues during delete - /// - File already deleted by concurrent process - /// - Permission/authentication issues - /// - Object store service unavailable - /// - /// This is a critical error - orphaned files indicate incomplete operations - /// and should be cleaned up to prevent storage bloat. - #[error( - "Failed to delete orphaned file '{filename}' from object store for table location {location_id}" - )] - DeleteOrphanedFile { - location_id: LocationId, - filename: String, - #[source] - source: object_store::Error, - }, - /// Registered file missing from object store (data corruption) /// /// This occurs when a file is registered in the metadata database but does diff --git a/crates/core/dump/src/compaction/collector.rs b/crates/core/dump/src/compaction/collector.rs index 79592f332..be2eefbf7 100644 --- a/crates/core/dump/src/compaction/collector.rs +++ b/crates/core/dump/src/compaction/collector.rs @@ -104,17 +104,20 @@ impl Collector { ); } - let paths_to_remove = metadata_db - .delete_file_ids(found_file_ids_to_paths.keys()) + // Delete from footer_cache (file_metadata was already deleted during compaction) + metadata_db + .delete_footer_cache(found_file_ids_to_paths.keys()) .await - .map_err(CollectorError::file_metadata_delete( + .map_err(CollectorError::footer_cache_delete( found_file_ids_to_paths.keys(), - ))? - .into_iter() - .filter_map(|file_id| found_file_ids_to_paths.get(&file_id).cloned()) - .collect::>(); + ))?; - tracing::debug!("Metadata entries deleted: {}", paths_to_remove.len()); + tracing::debug!( + "Footer cache entries deleted: {}", + found_file_ids_to_paths.len() + ); + + let paths_to_remove: BTreeSet<_> = found_file_ids_to_paths.values().cloned().collect(); if let Some(metrics) = &self.metrics { metrics.inc_expired_entries_deleted( @@ -161,6 +164,19 @@ impl Collector { tracing::debug!("Expired files deleted: {}", files_deleted); tracing::debug!("Expired files not found: {}", files_not_found); + // Delete from gc_manifest after physical files have been deleted + metadata_db + .delete_gc_manifest(found_file_ids_to_paths.keys()) + .await + .map_err(CollectorError::gc_manifest_delete( + found_file_ids_to_paths.keys(), + ))?; + + tracing::debug!( + "GC manifest entries deleted: {}", + found_file_ids_to_paths.len() + ); + if let Some(metrics) = self.metrics.as_ref() { metrics.inc_successful_collections(table_name.to_string()); } diff --git a/crates/core/dump/src/compaction/error.rs b/crates/core/dump/src/compaction/error.rs index 9fb9dea11..4f624bfe0 100644 --- a/crates/core/dump/src/compaction/error.rs +++ b/crates/core/dump/src/compaction/error.rs @@ -309,11 +309,31 @@ pub enum CollectorError { file_ids: Vec, }, - /// Failed to delete manifest record + /// Failed to delete footer cache records /// - /// This occurs when removing a file entry from the garbage collection manifest - /// after the file has been successfully deleted. The GC manifest tracks files - /// eligible for cleanup. + /// This occurs when removing footer cache entries from the database after + /// the gc_manifest entries have expired. Footer cache entries are kept until + /// the Collector runs to allow reads during the grace period. + /// + /// Common causes: + /// - Database connection lost during delete + /// - Transaction conflicts with concurrent operations + /// - Insufficient database permissions for DELETE queries + /// + /// This leaves orphaned footer cache entries. They should be cleaned up + /// manually or via retry. + #[error("failed to delete footer cache for files {file_ids:?}: {err}")] + FooterCacheDelete { + #[source] + err: metadata_db::Error, + file_ids: Vec, + }, + + /// Failed to delete gc_manifest records + /// + /// This occurs when removing entries from the garbage collection manifest + /// after the files have been successfully deleted from storage. The GC manifest + /// tracks files eligible for cleanup. /// /// Common causes: /// - Database connection lost during delete @@ -321,13 +341,13 @@ pub enum CollectorError { /// - Record already deleted by another process /// - Insufficient database permissions /// - /// This leaves a stale entry in the GC manifest. The entry should be cleaned + /// This leaves stale entries in the GC manifest. They should be cleaned /// up to prevent repeated deletion attempts. - #[error("failed to delete file {file_id} from gc manifest: {err}")] - ManifestDelete { + #[error("failed to delete from gc manifest for files {file_ids:?}: {err}")] + GcManifestDelete { #[source] err: metadata_db::Error, - file_id: FileId, + file_ids: Vec, }, /// Failed to parse URL for file @@ -408,8 +428,22 @@ impl CollectorError { } } - pub fn gc_manifest_delete(file_id: FileId) -> impl FnOnce(metadata_db::Error) -> Self { - move |err| Self::ManifestDelete { err, file_id } + pub fn footer_cache_delete<'a>( + file_ids: impl Iterator, + ) -> impl FnOnce(metadata_db::Error) -> Self { + move |err| Self::FooterCacheDelete { + err, + file_ids: file_ids.cloned().collect(), + } + } + + pub fn gc_manifest_delete<'a>( + file_ids: impl Iterator, + ) -> impl FnOnce(metadata_db::Error) -> Self { + move |err| Self::GcManifestDelete { + err, + file_ids: file_ids.cloned().collect(), + } } pub fn parse_error(file_id: FileId) -> impl FnOnce(url::ParseError) -> Self { diff --git a/crates/core/metadata-db/Cargo.toml b/crates/core/metadata-db/Cargo.toml index b5b2a0741..183ea9cf1 100644 --- a/crates/core/metadata-db/Cargo.toml +++ b/crates/core/metadata-db/Cargo.toml @@ -9,6 +9,8 @@ license-file.workspace = true default = [] # Temporary database support for testing environments temp-db = ["pgtemp"] +# Test utilities for downstream crates +test-utils = [] [dependencies] backon.workspace = true diff --git a/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql b/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql new file mode 100644 index 000000000..6bf1a0ce2 --- /dev/null +++ b/crates/core/metadata-db/migrations/20251204195315_remove_file_metadata_footer_col.sql @@ -0,0 +1,16 @@ +-- Drop the footer column from file_metadata +ALTER TABLE file_metadata DROP COLUMN footer; + +-- Remove the foreign key constraint from footer_cache entirely +-- This allows us to delete from file_metadata eagerly during compaction +-- while keeping footer_cache entries until the Collector runs +ALTER TABLE footer_cache DROP CONSTRAINT footer_cache_file_id_fkey; + +-- Remove the foreign key constraint from gc_manifest to file_metadata +-- This allows us to delete from file_metadata while keeping gc_manifest entries +-- for garbage collection tracking +ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_file_id_fkey; + +-- Remove the CHECK constraint on gc_manifest.expiration +-- This allows inserting entries with any expiration time +ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_expiration_check; diff --git a/crates/core/metadata-db/src/files.rs b/crates/core/metadata-db/src/files.rs index 664dd9996..2e069b4c3 100644 --- a/crates/core/metadata-db/src/files.rs +++ b/crates/core/metadata-db/src/files.rs @@ -37,8 +37,8 @@ where { let query = indoc::indoc! {r#" WITH inserted AS ( - INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata, footer) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING RETURNING id ) @@ -135,12 +135,20 @@ where } /// Delete file metadata record by ID +/// +/// Also deletes the corresponding footer_cache entry. #[instrument(skip(executor))] pub async fn delete<'e, E>(executor: E, id: FileId) -> Result where E: sqlx::Executor<'e, Database = sqlx::Postgres>, { - let query = "DELETE FROM file_metadata WHERE id = $1"; + // Delete from footer_cache first (no FK constraint), then file_metadata + let query = indoc::indoc! {r#" + WITH deleted_footer AS ( + DELETE FROM footer_cache WHERE file_id = $1 + ) + DELETE FROM file_metadata WHERE id = $1 + "#}; let result = sqlx::query(query).bind(id).execute(executor).await?; Ok(result.rows_affected() > 0) diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 8a53a6382..ad3b034d9 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -301,6 +301,7 @@ impl MetadataDb { /// Delete file metadata record by ID /// + /// Also deletes the corresponding footer_cache entry. /// Returns `true` if a file was deleted, `false` if no file was found. pub async fn delete_file(&self, file_id: FileId) -> Result { files::delete(&*self.pool, file_id) @@ -355,10 +356,12 @@ impl MetadataDb { .map_err(Error::Database) } - /// Inserts or updates the GC manifest for the given file IDs. - /// If a file ID already exists, it updates the expiration time. + /// Deletes file metadata entries and inserts them into the GC manifest. + /// + /// This eagerly removes files from file_metadata so they won't be listed, + /// while keeping footer_cache entries until the Collector runs. /// The expiration time is set to the current time plus the given duration. - /// If the file ID does not exist, it inserts a new row. + /// If the file ID already exists in gc_manifest, it updates the expiration time. pub async fn upsert_gc_manifest( &self, location_id: LocationId, @@ -370,16 +373,20 @@ impl MetadataDb { ..Default::default() }; - let sql = " + let sql = indoc::indoc! {r#" + WITH deleted_files AS ( + DELETE FROM file_metadata + WHERE id = ANY($2) + RETURNING id, file_name + ) INSERT INTO gc_manifest (location_id, file_id, file_path, expiration) SELECT $1 - , file.id - , file_metadata.file_name + , deleted_files.id + , deleted_files.file_name , CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + $3 - FROM UNNEST ($2) AS file(id) - INNER JOIN file_metadata ON file_metadata.id = file.id - ON CONFLICT (file_id) DO UPDATE SET expiration = EXCLUDED.expiration; - "; + FROM deleted_files + ON CONFLICT (file_id) DO UPDATE SET expiration = EXCLUDED.expiration + "#}; sqlx::query(sql) .bind(location_id) .bind(file_ids.iter().map(|id| **id).collect::>()) @@ -411,6 +418,69 @@ impl MetadataDb { .map_err(Error::Database) .boxed() } + + /// Deletes footer cache entries for the given file IDs. + /// + /// Called by the Collector when gc_manifest entries have expired + /// and physical files are ready to be deleted. + pub async fn delete_footer_cache( + &self, + file_ids: impl Iterator, + ) -> Result<(), Error> { + let sql = "DELETE FROM footer_cache WHERE file_id = ANY($1)"; + + sqlx::query(sql) + .bind(file_ids.map(|id| **id).collect::>()) + .execute(&*self.pool) + .await + .map_err(Error::Database)?; + + Ok(()) + } + + /// Deletes gc_manifest entries for the given file IDs. + /// + /// Called by the Collector after physical files have been deleted. + pub async fn delete_gc_manifest( + &self, + file_ids: impl Iterator, + ) -> Result<(), Error> { + let sql = "DELETE FROM gc_manifest WHERE file_id = ANY($1)"; + + sqlx::query(sql) + .bind(file_ids.map(|id| **id).collect::>()) + .execute(&*self.pool) + .await + .map_err(Error::Database)?; + + Ok(()) + } + + /// Counts footer_cache entries for a location. + /// + /// This counts footer_cache entries that belong to files associated with + /// this location, whether they are still in file_metadata (current files) + /// or in gc_manifest (files pending garbage collection). + #[cfg(feature = "test-utils")] + pub async fn count_footer_cache_by_location( + &self, + location_id: LocationId, + ) -> Result { + let sql = indoc::indoc! {r#" + SELECT COUNT(DISTINCT fc.file_id) FROM footer_cache fc + WHERE fc.file_id IN ( + SELECT id FROM file_metadata WHERE location_id = $1 + UNION + SELECT file_id FROM gc_manifest WHERE location_id = $1 + ) + "#}; + + sqlx::query_scalar(sql) + .bind(location_id) + .fetch_one(&*self.pool) + .await + .map_err(Error::Database) + } } /// Private module for sealed trait pattern diff --git a/crates/core/metadata-db/src/physical_table.rs b/crates/core/metadata-db/src/physical_table.rs index 2ff690b8a..d027b541d 100644 --- a/crates/core/metadata-db/src/physical_table.rs +++ b/crates/core/metadata-db/src/physical_table.rs @@ -194,12 +194,14 @@ where /// Delete a physical table location by its ID /// -/// This will also delete all associated file_metadata entries due to CASCADE constraints. +/// This will also delete all associated entries across multiple tables. /// /// # Cascade Effects /// /// Deleting a location will also delete: -/// - All file_metadata entries associated with this location +/// - All file_metadata entries associated with this location (CASCADE) +/// - All gc_manifest entries for those files (CASCADE from file_metadata) +/// - All footer_cache entries for those files (explicit delete, no FK constraint) #[tracing::instrument(skip(exe), err)] pub async fn delete_by_id<'c, E>( exe: E, diff --git a/crates/core/metadata-db/src/physical_table/sql.rs b/crates/core/metadata-db/src/physical_table/sql.rs index 2b99b79e3..a694051dc 100644 --- a/crates/core/metadata-db/src/physical_table/sql.rs +++ b/crates/core/metadata-db/src/physical_table/sql.rs @@ -288,13 +288,25 @@ where /// Delete a location by its ID /// -/// This will also delete all associated file_metadata entries due to CASCADE. +/// This will also delete all associated entries: +/// - file_metadata entries (via CASCADE on location_id FK) +/// - gc_manifest entries (via CASCADE on file_id FK from file_metadata) +/// - footer_cache entries (explicitly deleted since no FK constraint) +/// /// Returns true if the location was deleted, false if it didn't exist. pub async fn delete_by_id<'c, E>(exe: E, id: LocationId) -> Result where E: Executor<'c, Database = Postgres>, { + // Delete footer_cache entries first (no FK constraint), then physical_tables + // The physical_tables delete will cascade to file_metadata and gc_manifest let query = indoc::indoc! {" + WITH file_ids AS ( + SELECT id FROM file_metadata WHERE location_id = $1 + ), + deleted_footer_cache AS ( + DELETE FROM footer_cache WHERE file_id IN (SELECT id FROM file_ids) + ) DELETE FROM physical_tables WHERE id = $1 "}; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 2202fb58a..65cb52c85 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -30,7 +30,7 @@ firehose-datasets = { path = "../crates/extractors/firehose" } fs-err.workspace = true futures.workspace = true indoc.workspace = true -metadata-db = { path = "../crates/core/metadata-db" } +metadata-db = { path = "../crates/core/metadata-db", features = ["test-utils"] } monitoring = { path = "../crates/core/monitoring" } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["testing"] } diff --git a/tests/src/tests/it_sql_dataset_batch_size.rs b/tests/src/tests/it_sql_dataset_batch_size.rs index 76d77e6bc..2c09f2014 100644 --- a/tests/src/tests/it_sql_dataset_batch_size.rs +++ b/tests/src/tests/it_sql_dataset_batch_size.rs @@ -40,29 +40,74 @@ async fn sql_dataset_input_batch_size() { .await; // 4. Get catalog and count files - let file_count = test.file_count("sql_stream_ds", "even_blocks").await; + let physical_file_count = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; // 5. With batch size 1 and 4 blocks, we expect 4 files to be dumped (even if some are empty) // since microbatch_max_interval=1 should create one file per block even_blocks only includes // even block numbers, so we expect 2 files with data for blocks 15000000 and 15000002, plus // empty files for odd blocks. - assert_eq!(file_count, 4); + assert_eq!(physical_file_count, 4); + assert_eq!(file_metadata_count, 4); test.spawn_compaction_and_await_completion("sql_stream_ds", "even_blocks") .await; - // 6. After compaction, we expect an additional file to be created, with all data in it. - let file_count_after = test.file_count("sql_stream_ds", "even_blocks").await; + // 6. After compaction: + // - file_metadata: only 1 entry (the compacted file) since old files are eagerly deleted + // - physical files: 5 (4 original + 1 compacted, waiting for garbage collection) + // - footer_cache: 5 (preserved for all files until garbage collection) + let physical_file_count_after = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count_after = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; + let footer_cache_count_after = test + .footer_cache_count("sql_stream_ds", "even_blocks") + .await; - assert_eq!(file_count_after, 5); + assert_eq!( + file_metadata_count_after, 1, + "file_metadata should only have compacted file" + ); + assert_eq!( + physical_file_count_after, 5, + "physical files should include old + compacted" + ); + assert_eq!( + footer_cache_count_after, physical_file_count_after, + "footer_cache should match physical files" + ); test.spawn_collection_and_await_completion("sql_stream_ds", "even_blocks") .await; - // 7. After collection, we expect the original 4 files to be deleted, - // leaving only the compacted file. - let file_count_final = test.file_count("sql_stream_ds", "even_blocks").await; - assert_eq!(file_count_final, 1); + // 7. After collection, garbage collector deletes: + // - physical files (4 old ones) + // - footer_cache entries (4 old ones) + // - gc_manifest entries (4 old ones) + // Leaving only the compacted file everywhere. + let physical_file_count_final = test.file_count("sql_stream_ds", "even_blocks").await; + let file_metadata_count_final = test + .file_metadata_count("sql_stream_ds", "even_blocks") + .await; + let footer_cache_count_final = test + .footer_cache_count("sql_stream_ds", "even_blocks") + .await; + + assert_eq!( + physical_file_count_final, 1, + "only compacted file should remain" + ); + assert_eq!( + file_metadata_count_final, 1, + "only compacted file in metadata" + ); + assert_eq!( + footer_cache_count_final, 1, + "only compacted file in footer_cache" + ); let mut test_client = test.new_flight_client().await.unwrap(); let (res, _batch_count) = test_client @@ -233,4 +278,38 @@ impl TestCtx { async fn file_count(&self, dataset: &str, table: &str) -> usize { self.files(dataset, table).await.len() } + + /// Count file_metadata entries for a table (registered files) + async fn file_metadata_count(&self, dataset: &str, table: &str) -> usize { + let catalog = self.catalog_for_dataset(dataset).await.unwrap(); + let table = catalog + .tables() + .iter() + .find(|t| t.table_name() == table) + .unwrap(); + + table.files().await.map(|files| files.len()).unwrap_or(0) + } + + /// Count footer_cache entries for a table + /// + /// This counts footer_cache entries that belong to this location via either: + /// - file_metadata (current files) + /// - gc_manifest (files pending garbage collection) + async fn footer_cache_count(&self, dataset: &str, table: &str) -> usize { + let catalog = self.catalog_for_dataset(dataset).await.unwrap(); + let table = catalog + .tables() + .iter() + .find(|t| t.table_name() == table) + .unwrap(); + + let location_id = table.location_id(); + let metadata_db = self.ctx.daemon_worker().metadata_db(); + + metadata_db + .count_footer_cache_by_location(location_id) + .await + .unwrap_or(0) as usize + } } From b72bd93420b2e2b92c27d09d3334477178696733 Mon Sep 17 00:00:00 2001 From: John Swanson Date: Thu, 4 Dec 2025 14:00:32 -0800 Subject: [PATCH 3/5] fix(metadata-db): remove duplicate lines from merge Remove duplicate doc comment and `url` parameter in `files::insert` that were accidentally introduced during merge from main. Signed-off-by: John Swanson --- crates/core/metadata-db/src/files.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/core/metadata-db/src/files.rs b/crates/core/metadata-db/src/files.rs index 8d5aad97c..2e069b4c3 100644 --- a/crates/core/metadata-db/src/files.rs +++ b/crates/core/metadata-db/src/files.rs @@ -19,14 +19,12 @@ pub type FooterBytes = Vec; /// /// Creates a new file metadata entry. Uses ON CONFLICT DO NOTHING for idempotency. /// Also inserts the footer into the footer_cache table. -/// Also inserts the footer into the footer_cache table. #[instrument(skip(executor, footer))] #[expect(clippy::too_many_arguments)] pub async fn insert<'e, E>( executor: E, location_id: LocationId, url: &Url, - url: &Url, file_name: String, object_size: u64, object_e_tag: Option, From aa913c4626f2043d5ff95fb7e835c0572994c18d Mon Sep 17 00:00:00 2001 From: JohnSwan1503 <82048481+JohnSwan1503@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:15:30 -0800 Subject: [PATCH 4/5] Update crates/core/metadata-db/src/lib.rs Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> Signed-off-by: JohnSwan1503 <82048481+JohnSwan1503@users.noreply.github.com> --- crates/core/metadata-db/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index c36509e32..4af860197 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -466,12 +466,12 @@ impl MetadataDb { pub async fn count_footer_cache_by_location( &self, location_id: LocationId, - ) -> Result { - let sql = indoc::indoc! {r#" SELECT COUNT(DISTINCT fc.file_id) FROM footer_cache fc WHERE fc.file_id IN ( SELECT id FROM file_metadata WHERE location_id = $1 - UNION + UNION ALL + SELECT file_id FROM gc_manifest WHERE location_id = $1 + ) SELECT file_id FROM gc_manifest WHERE location_id = $1 ) "#}; From 74da03ca9d97b3896c0f0906443319cef94eafa9 Mon Sep 17 00:00:00 2001 From: John Swanson Date: Fri, 5 Dec 2025 08:18:28 -0800 Subject: [PATCH 5/5] Revert "Update crates/core/metadata-db/src/lib.rs" This reverts commit aa913c4626f2043d5ff95fb7e835c0572994c18d. Bad bot! Signed-off-by: John Swanson --- crates/core/metadata-db/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/metadata-db/src/lib.rs b/crates/core/metadata-db/src/lib.rs index 4af860197..c36509e32 100644 --- a/crates/core/metadata-db/src/lib.rs +++ b/crates/core/metadata-db/src/lib.rs @@ -466,12 +466,12 @@ impl MetadataDb { pub async fn count_footer_cache_by_location( &self, location_id: LocationId, + ) -> Result { + let sql = indoc::indoc! {r#" SELECT COUNT(DISTINCT fc.file_id) FROM footer_cache fc WHERE fc.file_id IN ( SELECT id FROM file_metadata WHERE location_id = $1 - UNION ALL - SELECT file_id FROM gc_manifest WHERE location_id = $1 - ) + UNION SELECT file_id FROM gc_manifest WHERE location_id = $1 ) "#};