Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,8 @@ impl core::default::Default for iceberg::spec::Operation
pub fn iceberg::spec::Operation::default() -> iceberg::spec::Operation
impl core::fmt::Debug for iceberg::spec::Operation
pub fn iceberg::spec::Operation::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::hash::Hash for iceberg::spec::Operation
pub fn iceberg::spec::Operation::hash<__H: core::hash::Hasher>(&self, state: &mut __H)
impl core::marker::StructuralPartialEq for iceberg::spec::Operation
impl serde_core::ser::Serialize for iceberg::spec::Operation
pub fn iceberg::spec::Operation::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const MAIN_BRANCH: &str = "main";

/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc<Snapshot>;
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "lowercase")]
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
pub enum Operation {
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::table::Table;
use crate::transaction::snapshot::{
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
};
use crate::transaction::validate::SnapshotValidator;
use crate::transaction::{ActionCommit, TransactionAction};

/// FastAppendAction is a transaction action for fast append data files to the table.
Expand Down Expand Up @@ -108,6 +109,8 @@ impl TransactionAction for FastAppendAction {

struct FastAppendOperation;

impl SnapshotValidator for FastAppendOperation {}

impl SnapshotProduceOperation for FastAppendOperation {
fn operation(&self) -> Operation {
Operation::Append
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ mod update_properties;
mod update_schema;
mod update_statistics;
mod upgrade_format_version;
mod validate;

use std::sync::Arc;
use std::time::Duration;
Expand Down
9 changes: 8 additions & 1 deletion crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::spec::{
};
use crate::table::Table;
use crate::transaction::ActionCommit;
use crate::transaction::validate::SnapshotValidator;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};

const META_ROOT_PATH: &str = "metadata";
Expand Down Expand Up @@ -61,7 +62,7 @@ const META_ROOT_PATH: &str = "metadata";
///
/// 3. **Delete Entry Processing**: The `delete_entries()` method is intended for future delete
/// operations to specify which manifest entries should be marked as deleted.
pub(crate) trait SnapshotProduceOperation: Send + Sync {
pub(crate) trait SnapshotProduceOperation: Send + Sync + SnapshotValidator {
/// Returns the operation type that will be recorded in the snapshot summary.
///
/// This determines what kind of operation is being performed (e.g., `Append`, `Overwrite`),
Expand Down Expand Up @@ -437,6 +438,12 @@ impl<'a> SnapshotProducer<'a> {
snapshot_produce_operation: OP,
process: MP,
) -> Result<ActionCommit> {
// Validate the operation against the (refreshed) base table to detect conflicts
// before writing the new snapshot.
snapshot_produce_operation
.validate(self.table, self.table.metadata().current_snapshot_id())
.await?;

let manifest_list_path = self.generate_manifest_list_file_path(0);
let next_seq_num = self.table.metadata().next_sequence_number();
let first_row_id = self.table.metadata().next_row_id();
Expand Down
261 changes: 261 additions & 0 deletions crates/iceberg/src/transaction/validate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;

use futures::SinkExt;
use futures::future::try_join_all;
use once_cell::sync::Lazy;

use crate::delete_file_index::DeleteFileIndex;
use crate::error::Result;
use crate::scan::DeleteFileContext;
use crate::spec::{
DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType,
ManifestFile, Operation,
};
use crate::table::Table;
use crate::util::snapshot::ancestors_between;
use crate::{Error, ErrorKind};

/// Operations whose snapshots may add delete files.
static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy<HashSet<Operation>> =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The price for pay for defining hash set is the changes in public-api.txt etc. No strong opinion but it seems that we could avoid it by defining

/// Operations whose snapshots may add delete files.
fn adds_delete_files(op: &Operation) -> bool {
    matches!(op, Operation::Overwrite | Operation::Delete)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think having a reference to that function improves readability anyway.

Maybe not worth it since I don't expect a new operation type anytime soon, but I was thinking along the lines of this.

/// Operations whose snapshots may add delete files.
fn adds_delete_files(op: &Operation) -> bool {
    match op {
        Operation::Append | Operation::Replace => false,
        Operation::Overwrite | Operation::Delete => true,
    }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this mirrors the Java implementation, however I find it confusing as a replace operation will add delete files, when it rewrites the delete files to retain deletes it could not apply.

What we're really checking for here is that no new deletes apply to the data files we've rewritten.

Is adds_deletes(op: &Operation) better here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand, if we add multiple APIs to Operation like this, how do we tell which API to call in SnapshotValidator?

Right now we only need one implementation of validation_history across multiple table actions, and I think this is convenient. But would be happy to switch to the cleaner design if it doesn't really complicate the implementation

Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete]));

/// A trait for validating a snapshot before it is committed.
///
/// Each [`SnapshotProduceOperation`](super::snapshot::SnapshotProduceOperation) is also a
/// `SnapshotValidator`. The [`validate`](SnapshotValidator::validate) method is invoked by
/// [`SnapshotProducer::commit`](super::snapshot::SnapshotProducer) before the new snapshot is
/// written, giving each operation the chance to detect conflicts against the (refreshed) base
/// table. Operations that need no validation (e.g. fast append) rely on the default no-op
/// implementation.
///
/// The reusable checks ([`validation_history`](SnapshotValidator::validation_history),
/// [`validate_no_new_deletes_for_data_files`](SnapshotValidator::validate_no_new_deletes_for_data_files))
/// are provided as default methods so concrete operations can compose the subset they require.
pub(crate) trait SnapshotValidator {
/// Validates a snapshot against a table.
///
/// # Arguments
///
/// * `base` - The base table to validate against.
/// * `parent_snapshot_id` - The ID of the parent snapshot, if any. This is usually
/// the latest snapshot of the base table, unless it's a non-main branch
/// (note: writing to branches is not currently supported).
///
/// # Returns
///
/// A `Result` indicating success or an error if validation fails.
async fn validate(&self, _base: &Table, _parent_snapshot_id: Option<i64>) -> Result<()> {
Ok(())
}
Comment on lines +52 to +66

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not leave this abstract, such that append is documented as requiring no validation in its implementation?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would also work. I don't really have a preference here


/// Retrieves the history of snapshots between two points with matching operations and content
/// type.
///
/// # Arguments
///
/// * `base` - The base table to retrieve history from.
/// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the
/// beginning.
/// * `to_snapshot_id` - The ending snapshot ID (inclusive).
/// * `matching_operations` - Set of operations to match when collecting snapshots.
/// * `manifest_content_type` - The content type of manifests to collect.
///
/// # Returns
///
/// A tuple containing:
/// * A vector of manifest files matching the criteria.
/// * A set of snapshot IDs that were collected.
///
/// # Errors
///
/// Returns an error if the history between the snapshots cannot be determined.
#[allow(dead_code)]

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

marking them as dead_code for now and they will be used by delete or rewrite actions later on

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some unit tests?

async fn validation_history(
&self,
base: &Table,
from_snapshot_id: Option<i64>,
to_snapshot_id: i64,
matching_operations: &HashSet<Operation>,
manifest_content_type: ManifestContentType,
) -> Result<(Vec<ManifestFile>, HashSet<i64>)> {
let mut manifests: Vec<ManifestFile> = vec![];
let mut new_snapshots = HashSet::new();
let mut last_snapshot = None;

let metadata = base.metadata_ref();
let snapshots = ancestors_between(&metadata, to_snapshot_id, from_snapshot_id);

for current_snapshot in snapshots {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other for-loops have opportunity to be parallelized using runtime, we can revisit this once we are aligned on the general design or as a follow-up

last_snapshot = Some(current_snapshot.clone());

// Find all snapshots with the matching operations
// and their manifest files with the matching content type
if matching_operations.contains(&current_snapshot.summary().operation) {
new_snapshots.insert(current_snapshot.snapshot_id());

let manifest_list = base.manifest_list_reader(&current_snapshot).load().await?;

for manifest in manifest_list.entries() {
if manifest.content == manifest_content_type
&& manifest.added_snapshot_id == current_snapshot.snapshot_id()
{
manifests.push(manifest.clone());
}
}
}
}

if let Some(last_snapshot) = last_snapshot
&& last_snapshot.parent_snapshot_id() != from_snapshot_id
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot determine history between starting snapshot {} and the last known ancestor {}",
from_snapshot_id.unwrap_or(-1),
last_snapshot.snapshot_id()
),
));
}

Ok((manifests, new_snapshots))
}

/// Validates that there are no new delete files for the given data files.
///
/// # Arguments
///
/// * `base` - The base table to validate against.
/// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the
/// beginning.
/// * `to_snapshot_id` - The ending snapshot ID (inclusive), or None if there is no current
/// table state.
/// * `data_files` - The data files to check for conflicting delete files.
/// * `ignore_equality_deletes` - Whether to ignore equality deletes and only check for
/// positional deletes.
///
/// # Returns
///
/// A `Result` indicating success or an error if validation fails.
///
/// # Errors
///
/// Returns an error if new delete files are found for any of the data files.
#[allow(dead_code)]
async fn validate_no_new_deletes_for_data_files(
&self,
base: &Table,
from_snapshot_id: Option<i64>,
to_snapshot_id: Option<i64>,
data_files: &[DataFile],
ignore_equality_deletes: bool,
) -> Result<()> {
// Delete files only exist in format version 2 and above. If there is no current table
// state, or the table is V1, there cannot be any new delete files to conflict with.
let Some(to_snapshot_id) = to_snapshot_id else {
return Ok(());
};
if base.metadata().format_version() == FormatVersion::V1 {
return Ok(());
}

// Get matching delete files that have been added since the from_snapshot_id
let (delete_manifests, _) = self
.validation_history(
base,
from_snapshot_id,
to_snapshot_id,
&VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
ManifestContentType::Deletes,
)
.await?;

// Build the delete file index from the matching delete manifests.
//
// `DeleteFileIndex::new` spawns a background task that populates the index by
// collecting from the channel until *all* senders are dropped; queries on the
// returned index (`get_deletes_for_data_file`) block until that population
// completes. We therefore scope the sender to this block so it is dropped as
// soon as every delete file has been sent, allowing the index to finish
// populating before we query it below. Holding the sender past this point
// would deadlock.
let delete_file_index = {
let (delete_file_index, mut delete_file_tx) =
DeleteFileIndex::new(base.runtime().clone());
let manifests = try_join_all(
delete_manifests
.iter()
.map(|f| f.load_manifest(base.file_io())),
)
.await?;
for entry in manifests.iter().flat_map(|manifest| manifest.entries()) {
let delete_file_ctx = DeleteFileContext {
manifest_entry: entry.clone(),
partition_spec_id: entry.data_file().partition_spec_id,
};
delete_file_tx.send(delete_file_ctx).await?;
}
// `delete_file_tx` is dropped here as the block ends, closing the channel.
delete_file_index
};

// Get starting seq num from starting snapshot if available
let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id {
match base.metadata().snapshot_by_id(from_snapshot_id) {
Some(snapshot) => snapshot.sequence_number(),
None => INITIAL_SEQUENCE_NUMBER,
}
Comment on lines +221 to +224

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not an error if we cannot find the snapshot associated with from_snapshot_id? Is it safe to default to INITIAL_SEQUENCE_NUMBER here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't hurt if we validate against the entire snapshot history. the conflict rate would be higher if we were trying to pass an invalid starting snapshot id

} else {
INITIAL_SEQUENCE_NUMBER
};

// Validate if there are deletes using delete file index
for data_file in data_files {
let delete_files = delete_file_index
.get_deletes_for_data_file(data_file, Some(starting_sequence_number))
.await;

if ignore_equality_deletes {
if delete_files
.iter()
.any(|delete_file| delete_file.file_type == DataContentType::PositionDeletes)
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot commit, found new positional delete for added data file: {}",
data_file.file_path
),
));
Comment on lines +240 to +246

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data files are not 'added', right?

I was reviewing the Java implementation, the wording is: "Cannot commit, found new position delete for replaced data file: ". Shall we align with that?

https://github.com/apache/iceberg/blob/bead8dfab3249cbc6ba7713e862ff2698fd2122c/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L538-L549

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think "replaced data file" is more precise

}
} else if !delete_files.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot commit, found new delete for added data file: {}",
data_file.file_path
),
));
}
}

Ok(())
}
}
Loading