Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 15 additions & 61 deletions crates/core/dump/src/check.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeSet;

use common::{catalog::physical::PhysicalTable, query_context::Error as QueryError};
use futures::TryStreamExt as _;
Expand All @@ -7,26 +7,22 @@ use object_store::ObjectMeta;

/// Validates consistency between metadata database and object store for a table
///
/// This function performs a comprehensive consistency check to ensure that the metadata
/// database and object store are in sync. It detects and attempts to repair consistency
/// issues where possible, or returns errors for corruption that requires manual intervention.
/// This function performs a consistency check to ensure that all files registered
/// in the metadata database exist in the object store.
///
/// ## Consistency Checks Performed
///
/// 1. **Orphaned Files Detection**
/// - Verifies all files in object store are registered in metadata DB
/// - **Action on failure**: Automatically deletes orphaned files to restore consistency
/// - Orphaned files typically result from failed dump operations
/// **Missing Registered Files Detection**
/// - Verifies all metadata DB entries have corresponding files in object store
/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`]
/// - Indicates data corruption requiring manual intervention
///
/// 2. **Missing Registered Files Detection**
/// - Verifies all metadata DB entries have corresponding files in object store
/// - **Action on failure**: Returns [`ConsistencyError::MissingRegisteredFile`]
/// - Indicates data corruption requiring manual intervention
/// ## Note on Orphaned Files
///
/// ## Side Effects
///
/// ⚠️ **Warning**: This function has side effects - it deletes orphaned files from object store.
/// These deletions are logged at `WARN` level before execution.
/// Files that exist in the object store but are not registered in the metadata DB
/// are intentionally NOT deleted by this function. These files may be pending
/// garbage collection via the `gc_manifest` table. The garbage collector handles
/// cleanup of such files after their expiration period.
pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyError> {
// See also: metadata-consistency

Expand All @@ -45,7 +41,7 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE
let store = table.object_store();
let path = table.path();

let stored_files: BTreeMap<String, ObjectMeta> = store
let stored_files: BTreeSet<String> = store
.list(Some(table.path()))
.try_collect::<Vec<ObjectMeta>>()
.await
Expand All @@ -54,29 +50,12 @@ pub async fn consistency_check(table: &PhysicalTable) -> Result<(), ConsistencyE
source: err,
})?
.into_iter()
.filter_map(|object| Some((object.location.filename()?.to_string(), object)))
.filter_map(|object| object.location.filename().map(|s| s.to_string()))
.collect();

// TODO: Move the orphaned files deletion logic out of this check function. This side effect
// should be handled somewhere else (e.g., by the garbage collector).
for (filename, object_meta) in &stored_files {
if !registered_files.contains(filename) {
// This file was written by a dump job, but it is not present in the metadata DB,
// so it is an orphaned file. Delete it.
tracing::warn!("Deleting orphaned file: {}", object_meta.location);
store.delete(&object_meta.location).await.map_err(|err| {
ConsistencyError::DeleteOrphanedFile {
location_id,
filename: filename.clone(),
source: err,
}
})?;
}
}

// Check for files in the metadata DB that do not exist in the store.
for filename in registered_files {
if !stored_files.contains_key(&filename) {
if !stored_files.contains(&filename) {
return Err(ConsistencyError::MissingRegisteredFile {
location_id,
filename,
Expand Down Expand Up @@ -127,31 +106,6 @@ pub enum ConsistencyError {
source: object_store::Error,
},

/// Failed to delete orphaned file from object store
///
/// This occurs when attempting to clean up a file that exists in object store
/// but is not registered in metadata DB. The file is considered orphaned
/// (likely from a failed dump operation) and should be deleted to restore
/// consistency.
///
/// Possible causes:
/// - Object store connectivity issues during delete
/// - File already deleted by concurrent process
/// - Permission/authentication issues
/// - Object store service unavailable
///
/// This is a critical error - orphaned files indicate incomplete operations
/// and should be cleaned up to prevent storage bloat.
#[error(
"Failed to delete orphaned file '{filename}' from object store for table location {location_id}"
)]
DeleteOrphanedFile {
location_id: LocationId,
filename: String,
#[source]
source: object_store::Error,
},

/// Registered file missing from object store (data corruption)
///
/// This occurs when a file is registered in the metadata database but does
Expand Down
32 changes: 24 additions & 8 deletions crates/core/dump/src/compaction/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ impl Collector {
);
}

let paths_to_remove = metadata_db
.delete_file_ids(found_file_ids_to_paths.keys())
// Delete from footer_cache (file_metadata was already deleted during compaction)
metadata_db
.delete_footer_cache(found_file_ids_to_paths.keys())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling or logging if the number of deleted entries doesn't match expectations. For example, if delete_footer_cache or delete_gc_manifest operations partially succeed, it might be worth logging the discrepancy.

.await
.map_err(CollectorError::file_metadata_delete(
.map_err(CollectorError::footer_cache_delete(
found_file_ids_to_paths.keys(),
))?
.into_iter()
.filter_map(|file_id| found_file_ids_to_paths.get(&file_id).cloned())
.collect::<BTreeSet<_>>();
))?;

tracing::debug!("Metadata entries deleted: {}", paths_to_remove.len());
tracing::debug!(
"Footer cache entries deleted: {}",
found_file_ids_to_paths.len()
);

let paths_to_remove: BTreeSet<_> = found_file_ids_to_paths.values().cloned().collect();

if let Some(metrics) = &self.metrics {
metrics.inc_expired_entries_deleted(
Expand Down Expand Up @@ -161,6 +164,19 @@ impl Collector {
tracing::debug!("Expired files deleted: {}", files_deleted);
tracing::debug!("Expired files not found: {}", files_not_found);

// Delete from gc_manifest after physical files have been deleted
metadata_db
.delete_gc_manifest(found_file_ids_to_paths.keys())
.await
.map_err(CollectorError::gc_manifest_delete(
found_file_ids_to_paths.keys(),
))?;

tracing::debug!(
"GC manifest entries deleted: {}",
found_file_ids_to_paths.len()
);

if let Some(metrics) = self.metrics.as_ref() {
metrics.inc_successful_collections(table_name.to_string());
}
Expand Down
54 changes: 44 additions & 10 deletions crates/core/dump/src/compaction/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,25 +309,45 @@ pub enum CollectorError {
file_ids: Vec<FileId>,
},

/// Failed to delete manifest record
/// Failed to delete footer cache records
///
/// This occurs when removing a file entry from the garbage collection manifest
/// after the file has been successfully deleted. The GC manifest tracks files
/// eligible for cleanup.
/// This occurs when removing footer cache entries from the database after
/// the gc_manifest entries have expired. Footer cache entries are kept until
/// the Collector runs to allow reads during the grace period.
///
/// Common causes:
/// - Database connection lost during delete
/// - Transaction conflicts with concurrent operations
/// - Insufficient database permissions for DELETE queries
///
/// This leaves orphaned footer cache entries. They should be cleaned up
/// manually or via retry.
#[error("failed to delete footer cache for files {file_ids:?}: {err}")]
FooterCacheDelete {
#[source]
err: metadata_db::Error,
file_ids: Vec<FileId>,
},

/// Failed to delete gc_manifest records
///
/// This occurs when removing entries from the garbage collection manifest
/// after the files have been successfully deleted from storage. The GC manifest
/// tracks files eligible for cleanup.
///
/// Common causes:
/// - Database connection lost during delete
/// - Transaction conflicts with concurrent operations
/// - Record already deleted by another process
/// - Insufficient database permissions
///
/// This leaves a stale entry in the GC manifest. The entry should be cleaned
/// This leaves stale entries in the GC manifest. They should be cleaned
/// up to prevent repeated deletion attempts.
#[error("failed to delete file {file_id} from gc manifest: {err}")]
ManifestDelete {
#[error("failed to delete from gc manifest for files {file_ids:?}: {err}")]
GcManifestDelete {
#[source]
err: metadata_db::Error,
file_id: FileId,
file_ids: Vec<FileId>,
},

/// Failed to parse URL for file
Expand Down Expand Up @@ -408,8 +428,22 @@ impl CollectorError {
}
}

pub fn gc_manifest_delete(file_id: FileId) -> impl FnOnce(metadata_db::Error) -> Self {
move |err| Self::ManifestDelete { err, file_id }
pub fn footer_cache_delete<'a>(
file_ids: impl Iterator<Item = &'a FileId>,
) -> impl FnOnce(metadata_db::Error) -> Self {
move |err| Self::FooterCacheDelete {
err,
file_ids: file_ids.cloned().collect(),
}
}

pub fn gc_manifest_delete<'a>(
file_ids: impl Iterator<Item = &'a FileId>,
) -> impl FnOnce(metadata_db::Error) -> Self {
move |err| Self::GcManifestDelete {
err,
file_ids: file_ids.cloned().collect(),
}
}

pub fn parse_error(file_id: FileId) -> impl FnOnce(url::ParseError) -> Self {
Expand Down
2 changes: 2 additions & 0 deletions crates/core/metadata-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ license-file.workspace = true
default = []
# Temporary database support for testing environments
temp-db = ["pgtemp"]
# Test utilities for downstream crates
test-utils = []

[dependencies]
backon.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Drop the footer column from file_metadata
ALTER TABLE file_metadata DROP COLUMN footer;

-- Remove the foreign key constraint from footer_cache entirely
-- This allows us to delete from file_metadata eagerly during compaction
-- while keeping footer_cache entries until the Collector runs
ALTER TABLE footer_cache DROP CONSTRAINT footer_cache_file_id_fkey;

-- Remove the foreign key constraint from gc_manifest to file_metadata
-- This allows us to delete from file_metadata while keeping gc_manifest entries
-- for garbage collection tracking
ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_file_id_fkey;

-- Remove the CHECK constraint on gc_manifest.expiration
-- This allows inserting entries with any expiration time
ALTER TABLE gc_manifest DROP CONSTRAINT gc_manifest_expiration_check;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: This migration makes breaking schema changes by dropping constraints and columns. Ensure you have:

  1. A rollback plan if needed
  2. Verified that no running processes depend on these constraints
  3. Considered the impact on any concurrent operations during deployment

The removal of FK constraints enables the new eager deletion pattern, but make sure to coordinate the deployment carefully.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1403 Prepares the metadata db for the schema changes made here in this PR's migrations.

14 changes: 11 additions & 3 deletions crates/core/metadata-db/src/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ where
{
let query = indoc::indoc! {r#"
WITH inserted AS (
INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata, footer)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO file_metadata (location_id, url, file_name, object_size, object_e_tag, object_version, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT DO NOTHING
RETURNING id
)
Expand Down Expand Up @@ -135,12 +135,20 @@ where
}

/// Delete file metadata record by ID
///
/// Also deletes the corresponding footer_cache entry.
#[instrument(skip(executor))]
pub async fn delete<'e, E>(executor: E, id: FileId) -> Result<bool, sqlx::Error>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let query = "DELETE FROM file_metadata WHERE id = $1";
// Delete from footer_cache first (no FK constraint), then file_metadata
let query = indoc::indoc! {r#"
WITH deleted_footer AS (
DELETE FROM footer_cache WHERE file_id = $1
)
DELETE FROM file_metadata WHERE id = $1
"#};

let result = sqlx::query(query).bind(id).execute(executor).await?;
Ok(result.rows_affected() > 0)
Expand Down
Loading