Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mockall = { workspace = true }
pretty_assertions = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
rstest.workspace = true
serde_arrow = { version = "0.14", features = ["arrow-58"] }
tempfile = { workspace = true }

Expand Down
34 changes: 33 additions & 1 deletion crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ pub iceberg::spec::Transform::Void
pub iceberg::spec::Transform::Year
impl iceberg::spec::Transform
pub fn iceberg::spec::Transform::dedup_name(&self) -> alloc::string::String
pub fn iceberg::spec::Transform::is_time_transform(&self) -> bool
pub fn iceberg::spec::Transform::preserves_order(&self) -> bool
pub fn iceberg::spec::Transform::project(&self, name: &str, predicate: &iceberg::expr::BoundPredicate) -> iceberg::Result<core::option::Option<iceberg::expr::Predicate>>
pub fn iceberg::spec::Transform::result_type(&self, input_type: &iceberg::spec::Type) -> iceberg::Result<iceberg::spec::Type>
Expand Down Expand Up @@ -2213,6 +2214,7 @@ pub iceberg::spec::PartitionField::source_id: i32
pub iceberg::spec::PartitionField::transform: iceberg::spec::Transform
impl iceberg::spec::PartitionField
pub fn iceberg::spec::PartitionField::into_unbound(self) -> iceberg::spec::UnboundPartitionField
pub fn iceberg::spec::PartitionField::new(source_id: i32, field_id: i32, name: impl core::convert::Into<alloc::string::String>, transform: iceberg::spec::Transform) -> Self
impl core::clone::Clone for iceberg::spec::PartitionField
pub fn iceberg::spec::PartitionField::clone(&self) -> iceberg::spec::PartitionField
impl core::cmp::Eq for iceberg::spec::PartitionField
Expand Down Expand Up @@ -3059,23 +3061,52 @@ pub fn iceberg::transaction::AddColumn::optional(name: impl alloc::string::ToStr
pub fn iceberg::transaction::AddColumn::required(name: impl alloc::string::ToString, field_type: iceberg::spec::Type, initial_default: iceberg::spec::Literal) -> Self
impl iceberg::transaction::AddColumn
pub fn iceberg::transaction::AddColumn::builder() -> AddColumnBuilder<((), (), (), (), (), (), ())>
pub struct iceberg::transaction::Term
impl iceberg::transaction::Term
pub fn iceberg::transaction::Term::bucket(source_name: impl core::convert::Into<alloc::string::String>, num_buckets: u32) -> Self
pub fn iceberg::transaction::Term::day(source_name: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::transaction::Term::hour(source_name: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::transaction::Term::identity(source_name: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::transaction::Term::month(source_name: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::transaction::Term::new(source_name: impl core::convert::Into<alloc::string::String>, transform: iceberg::spec::Transform) -> Self
pub fn iceberg::transaction::Term::truncate(source_name: impl core::convert::Into<alloc::string::String>, width: u32) -> Self
pub fn iceberg::transaction::Term::year(source_name: impl core::convert::Into<alloc::string::String>) -> Self
impl core::clone::Clone for iceberg::transaction::Term
pub fn iceberg::transaction::Term::clone(&self) -> iceberg::transaction::Term
impl core::cmp::Eq for iceberg::transaction::Term
impl core::cmp::PartialEq for iceberg::transaction::Term
pub fn iceberg::transaction::Term::eq(&self, other: &iceberg::transaction::Term) -> bool
impl core::fmt::Debug for iceberg::transaction::Term
pub fn iceberg::transaction::Term::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::marker::StructuralPartialEq for iceberg::transaction::Term
pub struct iceberg::transaction::Transaction
impl iceberg::transaction::Transaction
pub async fn iceberg::transaction::Transaction::commit(self, catalog: &dyn iceberg::Catalog) -> iceberg::Result<iceberg::table::Table>
pub fn iceberg::transaction::Transaction::fast_append(&self) -> iceberg::transaction::append::FastAppendAction
pub fn iceberg::transaction::Transaction::new(table: &iceberg::table::Table) -> Self
pub fn iceberg::transaction::Transaction::replace_sort_order(&self) -> iceberg::transaction::sort_order::ReplaceSortOrderAction
pub fn iceberg::transaction::Transaction::update_location(&self) -> iceberg::transaction::update_location::UpdateLocationAction
pub fn iceberg::transaction::Transaction::update_partition(&self) -> iceberg::transaction::UpdatePartitionSpecAction
pub fn iceberg::transaction::Transaction::update_partition_spec(&self) -> iceberg::transaction::UpdatePartitionSpecAction
pub fn iceberg::transaction::Transaction::update_schema(&self) -> iceberg::transaction::update_schema::UpdateSchemaAction
pub fn iceberg::transaction::Transaction::update_statistics(&self) -> iceberg::transaction::update_statistics::UpdateStatisticsAction
pub fn iceberg::transaction::Transaction::update_table_properties(&self) -> iceberg::transaction::update_properties::UpdatePropertiesAction
pub fn iceberg::transaction::Transaction::upgrade_table_version(&self) -> iceberg::transaction::upgrade_format_version::UpgradeFormatVersionAction
impl core::clone::Clone for iceberg::transaction::Transaction
pub fn iceberg::transaction::Transaction::clone(&self) -> iceberg::transaction::Transaction
pub struct iceberg::transaction::UpdatePartitionSpecAction
impl iceberg::transaction::UpdatePartitionSpecAction
pub fn iceberg::transaction::UpdatePartitionSpecAction::add_field(self, name: core::option::Option<alloc::string::String>, term: iceberg::transaction::Term) -> iceberg::Result<Self>
pub fn iceberg::transaction::UpdatePartitionSpecAction::add_non_default_spec(self) -> Self
pub fn iceberg::transaction::UpdatePartitionSpecAction::case_sensitive(self, case_sensitive: bool) -> Self
pub fn iceberg::transaction::UpdatePartitionSpecAction::new(metadata: &iceberg::spec::TableMetadata) -> Self
pub fn iceberg::transaction::UpdatePartitionSpecAction::remove_field_by_name(self, name: impl core::convert::Into<alloc::string::String>) -> iceberg::Result<Self>
pub fn iceberg::transaction::UpdatePartitionSpecAction::remove_field_by_term(self, term: iceberg::transaction::Term) -> iceberg::Result<Self>
pub fn iceberg::transaction::UpdatePartitionSpecAction::rename_field(self, name: impl core::convert::Into<alloc::string::String>, new_name: impl core::convert::Into<alloc::string::String>) -> iceberg::Result<Self>
pub trait iceberg::transaction::ApplyTransactionAction
pub fn iceberg::transaction::ApplyTransactionAction::apply(self, tx: iceberg::transaction::Transaction) -> iceberg::Result<iceberg::transaction::Transaction>
impl<T: TransactionAction + 'static> iceberg::transaction::ApplyTransactionAction for T
pub fn T::apply(self, tx: iceberg::transaction::Transaction) -> iceberg::Result<iceberg::transaction::Transaction> where Self: core::marker::Sized
pub fn T::apply(self, tx: iceberg::transaction::Transaction) -> core::result::Result<iceberg::transaction::Transaction, iceberg::Error>
pub mod iceberg::transform
pub trait iceberg::transform::TransformFunction: core::marker::Send + core::marker::Sync + core::fmt::Debug
pub fn iceberg::transform::TransformFunction::transform(&self, input: arrow_array::array::ArrayRef) -> iceberg::Result<arrow_array::array::ArrayRef>
Expand Down Expand Up @@ -3274,6 +3305,7 @@ impl<B, L, F> iceberg::writer::IcebergWriterBuilder for iceberg::writer::base_wr
pub type iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder<B, L, F>::R = iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriter<B, L, F>
pub fn iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder<B, L, F>::build<'life0, 'async_trait>(&'life0 self, partition_key: core::option::Option<iceberg::spec::PartitionKey>) -> core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output = iceberg::Result<Self::R>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait
pub macro iceberg::ensure_data_valid!
pub macro iceberg::ensure_precondition!
#[non_exhaustive] pub enum iceberg::ErrorKind
pub iceberg::ErrorKind::CatalogCommitConflicts
pub iceberg::ErrorKind::DataInvalid
Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ macro_rules! ensure_data_valid {
};
}

/// Helper macro to check preconditions.
#[macro_export]
macro_rules! ensure_precondition {
($cond: expr, $fmt: literal, $($arg:tt)*) => {
if !$cond {
return Err($crate::error::Error::new($crate::error::ErrorKind::PreconditionFailed, format!($fmt, $($arg)*)))
}
};
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
28 changes: 23 additions & 5 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ pub struct PartitionField {
}

impl PartitionField {
/// Create a new partition field.
pub fn new(
source_id: i32,
field_id: i32,
name: impl Into<String>,
transform: Transform,
) -> Self {
PartitionField {
source_id,
field_id,
name: name.into(),
transform,
}
}

/// To unbound partition field
pub fn into_unbound(self) -> UnboundPartitionField {
self.into()
Expand Down Expand Up @@ -599,7 +614,9 @@ impl PartitionSpecBuilder {
) -> Result<()> {
match schema.field_by_name(field.name.as_str()) {
Some(schema_collision) => {
if field.transform == Transform::Identity {
if field.transform == Transform::Void {
Ok(())
} else if field.transform == Transform::Identity {
if schema_collision.id == field.source_id {
Ok(())
} else {
Expand Down Expand Up @@ -690,19 +707,20 @@ trait CorePartitionSpecValidator {
}

/// For a single source-column transformations must be unique.
/// Void transforms are excluded from this check because they are placeholders
/// for deleted fields in V1 specs, and multiple Void fields on the same source
/// column are valid.
fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
let collision = self.fields().iter().find(|f| {
f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
f.source_id == source_id && *transform != Transform::Void && f.transform == *transform
});

if let Some(collision) = collision {
Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
source_id,
transform.dedup_name(),
collision.name
source_id, transform, collision.name
),
))
} else {
Expand Down
20 changes: 11 additions & 9 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,12 @@ impl TableMetadataBuilder {
/// Validate partition field names against schema field names across all historical schemas.
///
/// Due to Iceberg's multi-version property, partition fields can share names with schema fields
/// if they meet specific requirements (identity transform + matching source field ID).
/// if they meet specific requirements (identity/void transform + matching source field ID).
/// This validation enforces those rules across all historical schema versions.
///
/// # Errors
/// - Partition field name conflicts with schema field name but doesn't use identity transform.
/// - Partition field uses identity transform but references wrong source field ID.
/// - Partition field name conflicts with schema field name but doesn't use identity/void transform.
/// - Partition field uses identity/void transform but references wrong source field ID.
fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
if self.metadata.schemas.is_empty() {
return Ok(());
Expand All @@ -789,15 +789,17 @@ impl TableMetadataBuilder {

// If name exists in schemas, validate against current schema rules
if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
let is_identity_transform =
partition_field.transform == crate::spec::Transform::Identity;
let is_schema_conflict_allowed = matches!(
partition_field.transform,
crate::spec::Transform::Identity | crate::spec::Transform::Void
);
let has_matching_source_id = schema_field.id == partition_field.source_id;

if !is_identity_transform {
if !is_schema_conflict_allowed {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
"Cannot create partition with name '{}' that conflicts with schema field and is not an identity/void transform.",
partition_field.name
),
));
Expand All @@ -807,7 +809,7 @@ impl TableMetadataBuilder {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot create identity partition sourced from different field in schema. \
"Cannot create identity/void partition sourced from different field in schema. \
Field name '{}' has id `{}` in schema but partition source id is `{}`",
partition_field.name, schema_field.id, partition_field.source_id
),
Expand Down Expand Up @@ -2929,7 +2931,7 @@ mod tests {
assert!(error_message.contains(
"Cannot create partition with name 'existing_field' that conflicts with schema field"
));
assert!(error_message.contains("and is not an identity transform"));
assert!(error_message.contains("and is not an identity/void transform"));
}

#[test]
Expand Down
8 changes: 8 additions & 0 deletions crates/iceberg/src/spec/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ impl Transform {
)
}

/// Returns true if this is a temporal transform (year, month, day, hour).
pub fn is_time_transform(&self) -> bool {
matches!(
self,
Transform::Year | Transform::Month | Transform::Day | Transform::Hour
)
}

/// Return the unique transform name to check if similar transforms for the same source field
/// are added multiple times in partition spec builder.
pub fn dedup_name(&self) -> String {
Expand Down
13 changes: 13 additions & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
mod action;

pub use action::*;
pub use update_partition::{Term, UpdatePartitionSpecAction};
mod append;
mod snapshot;
mod sort_order;
mod update_location;
mod update_partition;
mod update_properties;
mod update_schema;
mod update_statistics;
Expand Down Expand Up @@ -154,6 +156,17 @@ impl Transaction {
ReplaceSortOrderAction::new()
}

/// Update the default partition spec of table.
pub fn update_partition_spec(&self) -> UpdatePartitionSpecAction {
let metadata = self.table.metadata();
UpdatePartitionSpecAction::new(metadata)
}

/// Update the default partition spec of table.
pub fn update_partition(&self) -> UpdatePartitionSpecAction {
self.update_partition_spec()
}

/// Set the location of table
pub fn update_location(&self) -> UpdateLocationAction {
UpdateLocationAction::new()
Expand Down
Loading
Loading