From 3fb4847f227b069ea15da8dd150c20d786c76d85 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 29 Nov 2025 19:22:35 +0800 Subject: [PATCH 1/2] feat: update partition spec --- src/iceberg/CMakeLists.txt | 3 +- src/iceberg/meson.build | 1 + src/iceberg/partition_spec.cc | 5 + src/iceberg/partition_spec.h | 4 + src/iceberg/table.cc | 10 + src/iceberg/table.h | 4 + src/iceberg/table_metadata.cc | 84 +- src/iceberg/table_update.cc | 4 +- src/iceberg/test/CMakeLists.txt | 1 + .../test/update_partition_spec_test.cc | 822 ++++++++++++++++++ src/iceberg/transaction.cc | 19 + src/iceberg/transaction.h | 4 + src/iceberg/transform.cc | 23 + src/iceberg/transform.h | 5 + src/iceberg/transform_function.h | 3 + src/iceberg/type_fwd.h | 7 + src/iceberg/update/meson.build | 7 +- src/iceberg/update/pending_update.h | 1 + src/iceberg/update/update_partition_spec.cc | 476 ++++++++++ src/iceberg/update/update_partition_spec.h | 206 +++++ 20 files changed, 1681 insertions(+), 8 deletions(-) create mode 100644 src/iceberg/test/update_partition_spec_test.cc create mode 100644 src/iceberg/update/update_partition_spec.cc create mode 100644 src/iceberg/update/update_partition_spec.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index dcc9372f5..4fbdfc7c7 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -77,8 +77,9 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc - update/update_sort_order.cc + update/update_partition_spec.cc update/update_properties.cc + update/update_sort_order.cc util/bucket_util.cc util/conversions.cc util/decimal.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 1596a2ead..c10061072 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -99,6 +99,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/pending_update.cc', + 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_sort_order.cc', 'util/bucket_util.cc', diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index b0f1144c1..1a7bde3f0 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -95,6 +95,11 @@ Result> PartitionSpec::PartitionType( return std::make_unique(std::move(partition_fields)); } +bool PartitionSpec::SameSpec(const PartitionSpec& other) const { + return fields_ == other.fields_ && + last_assigned_field_id_ == other.last_assigned_field_id_; +} + std::string PartitionSpec::ToString() const { std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_); for (const auto& field : fields_) { diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 0d1a78f16..66a7933d8 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -64,6 +64,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Get the partition type binding to the input schema. Result> PartitionType(const Schema& schema) const; + /// \brief Checks whether this partition spec is equivalent to another partition spec + /// while ignoring the spec id. + bool SameSpec(const PartitionSpec& other) const; + std::string ToString() const override; int32_t last_assigned_field_id() const { return last_assigned_field_id_; } diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 38afdedd0..6b4d317b8 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,6 +19,8 @@ #include "iceberg/table.h" +#include + #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" @@ -28,6 +30,7 @@ #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" #include "iceberg/transaction.h" +#include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -147,6 +150,13 @@ Result> Table::NewTransaction() { /*auto_commit=*/false); } +Result> Table::NewUpdatePartitionSpec() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdatePartitionSpec(); +} + Result> Table::NewUpdateProperties() { ICEBERG_ASSIGN_OR_RAISE( auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 317aea018..30ad14c1b 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new Transaction to commit multiple table operations at once. virtual Result> NewTransaction(); + /// \brief Create a new UpdatePartitionSpec to update the partition spec of this table + /// and commit the changes. + virtual Result> NewUpdatePartitionSpec(); + /// \brief Create a new UpdateProperties to update table properties and commit the /// changes. virtual Result> NewUpdateProperties(); diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 2d77aef43..0fe25881a 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -428,7 +428,8 @@ class TableMetadataBuilder::Impl { Result AddSortOrder(const SortOrder& order); Status SetProperties(const std::unordered_map& updated); Status RemoveProperties(const std::unordered_set& removed); - + Status SetDefaultPartitionSpec(int32_t spec_id); + Result AddPartitionSpec(const PartitionSpec& spec); std::unique_ptr Build(); private: @@ -438,6 +439,12 @@ class TableMetadataBuilder::Impl { /// \return The ID to use for this sort order (reused if exists, new otherwise) int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order); + /// \brief Internal method to check for existing partition spec and reuse its ID or + /// create a new one + /// \param new_spec The partition spec to check + /// \return The ID to use for this partition spec (reused if exists, new otherwise) + int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec); + private: // Base metadata (nullptr for new tables) const TableMetadata* base_; @@ -572,6 +579,58 @@ Result TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order) return new_order_id; } +Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) { + if (spec_id == -1) { + if (!last_added_spec_id_.has_value()) { + return InvalidArgument( + "Cannot set last added partition spec: no partition spec has been added"); + } + return SetDefaultPartitionSpec(last_added_spec_id_.value()); + } + + if (spec_id == metadata_.default_spec_id) { + return {}; + } + + metadata_.default_spec_id = spec_id; + + changes_.push_back(std::make_unique(spec_id)); + return {}; +} + +Result TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) { + int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec); + + if (specs_by_id_.find(new_spec_id) != specs_by_id_.end()) { + // update last_added_spec_id if the spec was added in this set of changes (since it + // is now the last) + bool is_new_spec = last_added_spec_id_.has_value() && + std::ranges::find_if(changes_, [new_spec_id](const auto& change) { + auto* add_spec = + dynamic_cast(change.get()); + return add_spec && add_spec->spec()->spec_id() == new_spec_id; + }) != changes_.cend(); + last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt; + return new_spec_id; + } + + // Get current schema and validate the partition spec against it + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema()); + ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false)); + + std::shared_ptr new_spec; + ICEBERG_ASSIGN_OR_RAISE( + new_spec, + PartitionSpec::Make(new_spec_id, std::vector(spec.fields().begin(), + spec.fields().end()))); + metadata_.partition_specs.push_back(new_spec); + specs_by_id_.emplace(new_spec_id, new_spec); + + changes_.push_back(std::make_unique(new_spec)); + last_added_spec_id_ = new_spec_id; + return new_spec_id; +} + Status TableMetadataBuilder::Impl::SetProperties( const std::unordered_map& updated) { // If updated is empty, return early (no-op) @@ -653,6 +712,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId( return new_order_id; } +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId( + const PartitionSpec& new_spec) { + // determine the next spec id + int32_t new_spec_id = PartitionSpec::kInitialSpecId; + for (const auto& spec : metadata_.partition_specs) { + if (spec->SameSpec(new_spec)) { + return spec->spec_id(); + } else if (new_spec_id <= spec->spec_id()) { + new_spec_id = spec->spec_id() + 1; + } + } + return new_spec_id; +} + TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) : impl_(std::make_unique(format_version)) {} @@ -723,16 +796,19 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr sc TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec( std::shared_ptr spec) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec)); + return SetDefaultPartitionSpec(spec_id); } TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec( std::shared_ptr spec) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs( diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 87fcd1349..0fc275322 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -72,7 +72,7 @@ void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const { // AddPartitionSpec void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddPartitionSpec(spec_); } void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const { @@ -82,7 +82,7 @@ void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const { // SetDefaultPartitionSpec void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetDefaultPartitionSpec(spec_id_); } void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0d36f64cb..6d47b6038 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -154,6 +154,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES transaction_test.cc + update_partition_spec_test.cc update_properties_test.cc update_sort_order_test.cc) diff --git a/src/iceberg/test/update_partition_spec_test.cc b/src/iceberg/test/update_partition_spec_test.cc new file mode 100644 index 000000000..85d9cc52a --- /dev/null +++ b/src/iceberg/test/update_partition_spec_test.cc @@ -0,0 +1,822 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_spec.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/catalog/memory/in_memory_catalog.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +class UpdatePartitionSpecTest : public ::testing::TestWithParam { + protected: + void SetUp() override { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", {}); + format_version_ = GetParam(); + test_schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int64()), + SchemaField::MakeRequired(2, "ts", timestamp_tz()), + SchemaField::MakeRequired(3, "category", string()), + SchemaField::MakeOptional(4, "data", string())}, + 0); + + // Create unpartitioned and partitioned specs matching Java test + ICEBERG_UNWRAP_OR_FAIL( + auto unpartitioned_spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, std::vector{}, + PartitionSpec::kLegacyPartitionDataIdStart - 1)); + ICEBERG_UNWRAP_OR_FAIL( + partitioned_spec_, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002)); + + auto partitioned_metadata = + CreateBaseMetadata(format_version_, test_schema_, partitioned_spec_); + auto unpartitioned_metadata = + CreateBaseMetadata(format_version_, test_schema_, std::move(unpartitioned_spec)); + + // Write metadata files + partitioned_metadata->location = partitioned_table_location_; + unpartitioned_metadata->location = unpartitioned_table_location_; + + // Arrow MockFS cannot automatically create directories. + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(partitioned_table_location_ + "/metadata").ok()); + ASSERT_TRUE(arrow_fs->CreateDir(unpartitioned_table_location_ + "/metadata").ok()); + + // Write table metadata to the table location. + std::string partitioned_metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", partitioned_table_location_, + Uuid::GenerateV7().ToString()); + std::string unpartitioned_metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", unpartitioned_table_location_, + Uuid::GenerateV7().ToString()); + + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, partitioned_metadata_location, + *partitioned_metadata), + IsOk()); + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, unpartitioned_metadata_location, + *unpartitioned_metadata), + IsOk()); + + // Register the tables in the catalog. + ICEBERG_UNWRAP_OR_FAIL( + partitioned_table_, + catalog_->RegisterTable(partitioned_table_ident_, partitioned_metadata_location)); + ICEBERG_UNWRAP_OR_FAIL(unpartitioned_table_, + catalog_->RegisterTable(unpartitioned_table_ident_, + unpartitioned_metadata_location)); + } + + // Helper to create base metadata with a specific partition spec + std::unique_ptr CreateBaseMetadata(int8_t format_version, + std::shared_ptr schema, + std::shared_ptr spec) { + auto metadata = std::make_unique(); + metadata->format_version = format_version; + metadata->table_uuid = "test-uuid-1234"; + metadata->location = "/warehouse/test_table"; + metadata->last_sequence_number = 0; + metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)}; + metadata->last_column_id = 4; + metadata->current_schema_id = 0; + metadata->schemas.push_back(std::move(schema)); + metadata->default_spec_id = spec->spec_id(); + metadata->last_partition_id = spec->last_assigned_field_id(); + metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId; + metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->next_row_id = TableMetadata::kInitialRowId; + metadata->properties = TableProperties::default_properties(); + metadata->partition_specs.push_back(std::move(spec)); + return metadata; + } + + // Helper to create UpdatePartitionSpec with a specific partition spec + std::shared_ptr CreateUpdatePartitionSpec(bool partitioned) { + if (partitioned) { + auto update_result = partitioned_table_->NewUpdatePartitionSpec(); + if (!update_result.has_value()) { + ADD_FAILURE() << "Failed to create update: " << update_result.error().message; + return nullptr; + } + return update_result.value(); + } else { + auto update_result = unpartitioned_table_->NewUpdatePartitionSpec(); + if (!update_result.has_value()) { + ADD_FAILURE() << "Failed to create update: " << update_result.error().message; + return nullptr; + } + return update_result.value(); + } + } + + // Helper to create an expected partition spec + std::shared_ptr MakeExpectedSpec( + const std::vector& fields, int32_t last_assigned_field_id) { + auto spec_result = PartitionSpec::Make(PartitionSpec::kInitialSpecId, fields, + last_assigned_field_id); + if (!spec_result.has_value()) { + ADD_FAILURE() << "Failed to create expected spec: " << spec_result.error().message; + return nullptr; + } + return std::shared_ptr(spec_result.value().release()); + } + + // Helper to apply update and get the resulting spec + std::shared_ptr ApplyUpdateAndGetSpec( + std::shared_ptr update) { + auto result = update->Apply(); + if (!result.has_value()) { + ADD_FAILURE() << "Failed to apply update: " << result.error().message; + return nullptr; + } + return result.value().spec; + } + + // Helper to apply update and assert spec equality + void ApplyUpdateAndAssertSpec(std::shared_ptr update, + const std::vector& expected_fields, + int32_t last_assigned_field_id) { + auto updated_spec = ApplyUpdateAndGetSpec(update); + auto expected_spec = MakeExpectedSpec(expected_fields, last_assigned_field_id); + AssertPartitionSpecEquals(*expected_spec, *updated_spec); + } + + // Helper to assert partition spec equality + void AssertPartitionSpecEquals(const PartitionSpec& expected, + const PartitionSpec& actual) { + ASSERT_EQ(expected.fields().size(), actual.fields().size()); + for (size_t i = 0; i < expected.fields().size(); ++i) { + const auto& expected_field = expected.fields()[i]; + const auto& actual_field = actual.fields()[i]; + EXPECT_EQ(expected_field.source_id(), actual_field.source_id()); + EXPECT_EQ(expected_field.field_id(), actual_field.field_id()); + EXPECT_EQ(expected_field.name(), actual_field.name()); + EXPECT_EQ(*expected_field.transform(), *actual_field.transform()); + } + } + + // Helper to expect an error with a specific message + void ExpectError(std::shared_ptr update, ErrorKind expected_kind, + const std::string& expected_message) { + auto result = update->Apply(); + ASSERT_THAT(result, IsError(expected_kind)); + ASSERT_THAT(result, HasErrorMessage(expected_message)); + } + + // Helper to create a table with a custom partition spec + std::shared_ptr
CreateTableWithSpec(std::shared_ptr spec, + const std::string& table_name) { + auto metadata = CreateBaseMetadata(format_version_, test_schema_, spec); + TableIdentifier identifier{.ns = Namespace{.levels = {"test"}}, .name = table_name}; + std::string metadata_location = + std::format("/warehouse/{}/metadata/00000-{}.metadata.json", table_name, + Uuid::GenerateV7().ToString()); + auto table_result = + Table::Make(identifier, std::shared_ptr(metadata.release()), + metadata_location, file_io_, catalog_); + if (!table_result.has_value()) { + ADD_FAILURE() << "Failed to create table: " << table_result.error().message; + return nullptr; + } + return table_result.value(); + } + + // Helper to create UpdatePartitionSpec from a table + std::shared_ptr CreateUpdateFromTable( + std::shared_ptr
table) { + auto transaction_result = + Transaction::Make(table, Transaction::Kind::kUpdate, /*auto_commit=*/false); + if (!transaction_result.has_value()) { + ADD_FAILURE() << "Failed to create transaction: " + << transaction_result.error().message; + return nullptr; + } + auto update_result = UpdatePartitionSpec::Make(transaction_result.value()); + if (!update_result.has_value()) { + ADD_FAILURE() << "Failed to create UpdatePartitionSpec: " + << update_result.error().message; + return nullptr; + } + return update_result.value(); + } + + const TableIdentifier partitioned_table_ident_{.name = "partitioned_table"}; + const TableIdentifier unpartitioned_table_ident_{.name = "unpartitioned_table"}; + const std::string partitioned_table_location_{"/warehouse/partitioned_table"}; + const std::string unpartitioned_table_location_{"/warehouse/unpartitioned_table"}; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr test_schema_; + std::shared_ptr partitioned_spec_; + std::shared_ptr
partitioned_table_; + std::shared_ptr
unpartitioned_table_; + int8_t format_version_; +}; + +INSTANTIATE_TEST_SUITE_P(FormatVersions, UpdatePartitionSpecTest, ::testing::Values(1, 2), + [](const ::testing::TestParamInfo& info) { + return std::format("V{}", info.param); + }); + +TEST_P(UpdatePartitionSpecTest, TestAddIdentityByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField("category"); + ApplyUpdateAndAssertSpec( + update, {PartitionField(3, 1000, "category", Transform::Identity())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddIdentityByTerm) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Ref("category")); + ApplyUpdateAndAssertSpec( + update, {PartitionField(3, 1000, "category", Transform::Identity())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddYear) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Year("ts")); + ApplyUpdateAndAssertSpec(update, + {PartitionField(2, 1000, "ts_year", Transform::Year())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddMonth) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Month("ts")); + ApplyUpdateAndAssertSpec( + update, {PartitionField(2, 1000, "ts_month", Transform::Month())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDay) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Day("ts")); + ApplyUpdateAndAssertSpec(update, {PartitionField(2, 1000, "ts_day", Transform::Day())}, + 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddHour) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Hour("ts")); + ApplyUpdateAndAssertSpec(update, + {PartitionField(2, 1000, "ts_hour", Transform::Hour())}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddBucket) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Bucket("id", 16)); + ApplyUpdateAndAssertSpec( + update, {PartitionField(1, 1000, "id_bucket_16", Transform::Bucket(16))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddTruncate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4)); + ApplyUpdateAndAssertSpec( + update, {PartitionField(4, 1000, "data_trunc_4", Transform::Truncate(4))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddNamedPartition) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Bucket("id", 16), "shard"); + ApplyUpdateAndAssertSpec( + update, {PartitionField(1, 1000, "shard", Transform::Bucket(16))}, 1000); +} + +TEST_P(UpdatePartitionSpecTest, TestAddToExisting) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4)); + ApplyUpdateAndAssertSpec( + update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16)), + PartitionField(4, 1003, "data_trunc_4", Transform::Truncate(4))}, + 1003); +} + +TEST_P(UpdatePartitionSpecTest, TestMultipleAdds) { + ICEBERG_UNWRAP_OR_FAIL(auto update, unpartitioned_table_->NewUpdatePartitionSpec()); + update->AddField("category") + .AddField(Expressions::Day("ts")) + .AddField(Expressions::Bucket("id", 16), "shard") + .AddField(Expressions::Truncate("data", 4), "prefix"); + ApplyUpdateAndAssertSpec(update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); +} + +TEST_P(UpdatePartitionSpecTest, TestAddHourToDay) { + // First add day partition + ICEBERG_UNWRAP_OR_FAIL(auto update1, unpartitioned_table_->NewUpdatePartitionSpec()); + update1->AddField(Expressions::Day("ts")); + auto by_day_spec = ApplyUpdateAndGetSpec(update1); + + // Then add hour partition + auto table = CreateTableWithSpec(by_day_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table); + update2->AddField(Expressions::Hour("ts")); + auto by_hour_spec = ApplyUpdateAndGetSpec(update2); + + ASSERT_EQ(by_hour_spec->fields().size(), 2); + EXPECT_EQ(by_hour_spec->fields()[0].source_id(), 2); + EXPECT_EQ(by_hour_spec->fields()[0].name(), "ts_day"); + EXPECT_EQ(*by_hour_spec->fields()[0].transform(), *Transform::Day()); + EXPECT_EQ(by_hour_spec->fields()[1].source_id(), 2); + EXPECT_EQ(by_hour_spec->fields()[1].name(), "ts_hour"); + EXPECT_EQ(*by_hour_spec->fields()[1].transform(), *Transform::Hour()); +} + +TEST_P(UpdatePartitionSpecTest, TestAddMultipleBuckets) { + // First add bucket 16 + ICEBERG_UNWRAP_OR_FAIL(auto update1, unpartitioned_table_->NewUpdatePartitionSpec()); + update1->AddField(Expressions::Bucket("id", 16)); + auto bucket16_spec = ApplyUpdateAndGetSpec(update1); + + // Then add bucket 8 + auto table = CreateTableWithSpec(bucket16_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table); + update2->AddField(Expressions::Bucket("id", 8)); + ApplyUpdateAndAssertSpec( + update2, + {PartitionField(1, 1000, "id_bucket_16", Transform::Bucket(16)), + PartitionField(1, 1001, "id_bucket_8", Transform::Bucket(8))}, + 1001); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField("category"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Void()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField("shard"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByEquivalent) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Ref("category")); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Void()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveDayByEquivalent) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Day("ts")); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Void()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(1, 1002, "shard", Transform::Bucket(16))}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByEquivalent) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Bucket("id", 16)); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRename) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RenameField("shard", "id_bucket"); + ApplyUpdateAndAssertSpec(update, + {PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16))}, + 1002); +} + +TEST_P(UpdatePartitionSpecTest, TestMultipleChanges) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RenameField("shard", "id_bucket") + .RemoveField(Expressions::Day("ts")) + .AddField(Expressions::Truncate("data", 4), "prefix"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + if (format_version_ == 1) { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Void()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + auto expected = + MakeExpectedSpec({PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(1, 1002, "id_bucket", Transform::Bucket(16)), + PartitionField(4, 1003, "prefix", Transform::Truncate(4))}, + 1003); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestAddDeletedName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Bucket("id", 16)); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + auto updated_spec = result.spec; + + if (format_version_ == 1) { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day()), + PartitionField(1, 1002, "shard", Transform::Void())}, + 1002)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } else { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make( + PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(3, 1000, "category", Transform::Identity()), + PartitionField(2, 1001, "ts_day", Transform::Day())}, + 1001)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->RemoveField("prefix"); + ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added field"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->RemoveField(Expressions::Truncate("data", 4)); + ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->AddField(Expressions::Truncate("data", 4)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + update->AddField(Expressions::Truncate("data", 6), "prefix"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddRedundantTimePartition) { + // Test day + hour conflict + ICEBERG_UNWRAP_OR_FAIL(auto update1, unpartitioned_table_->NewUpdatePartitionSpec()); + update1->AddField(Expressions::Day("ts")); + update1->AddField(Expressions::Hour("ts")); + ExpectError(update1, ErrorKind::kValidationFailed, + "Cannot add redundant partition field"); + + // Test hour + month conflict after adding hour to existing day + ICEBERG_UNWRAP_OR_FAIL(auto update2, partitioned_table_->NewUpdatePartitionSpec()); + update2->AddField(Expressions::Hour("ts")); // day already exists, so hour is OK + update2->AddField(Expressions::Month("ts")); // conflicts with hour + ExpectError(update2, ErrorKind::kValidationFailed, + "Cannot add redundant partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestNoEffectAddDeletedSameFieldWithSameName) { + ICEBERG_UNWRAP_OR_FAIL(auto update1, partitioned_table_->NewUpdatePartitionSpec()); + update1->RemoveField("shard"); + update1->AddField(Expressions::Bucket("id", 16), "shard"); + ICEBERG_UNWRAP_OR_FAIL(auto result1, update1->Apply()); + auto spec1 = result1.spec; + AssertPartitionSpecEquals(*partitioned_spec_, *spec1); + + ICEBERG_UNWRAP_OR_FAIL(auto update2, partitioned_table_->NewUpdatePartitionSpec()); + update2->RemoveField("shard"); + update2->AddField(Expressions::Bucket("id", 16)); + ICEBERG_UNWRAP_OR_FAIL(auto result2, update2->Apply()); + auto spec2 = result2.spec; + AssertPartitionSpecEquals(*partitioned_spec_, *spec2); +} + +TEST_P(UpdatePartitionSpecTest, TestGenerateNewSpecAddDeletedSameFieldWithDifferentName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField("shard"); + update->AddField(Expressions::Bucket("id", 16), "new_shard"); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + auto updated_spec = result.spec; + + ASSERT_EQ(updated_spec->fields().size(), 3); + EXPECT_EQ(updated_spec->fields()[0].name(), "category"); + EXPECT_EQ(updated_spec->fields()[1].name(), "ts_day"); + EXPECT_EQ(updated_spec->fields()[2].name(), "new_shard"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Identity()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day()); + EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Bucket(16)); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField("category"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByRef) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Ref("category")); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddDuplicateTransform) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Bucket("id", 16)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestAddNamedDuplicate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Bucket("id", 16), "b16"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot add duplicate partition field"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByName) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField("moon"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to remove"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByEquivalent) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Hour("ts")); // day(ts) exists, not hour + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to remove"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameUnknownField) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RenameField("shake", "seal"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot find partition field to rename: shake"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameAfterAdd) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "data_trunc"); + update->RenameField("data_trunc", "prefix"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot rename newly added partition field: data_trunc"); +} + +TEST_P(UpdatePartitionSpecTest, TestRenameAndDelete) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RenameField("shard", "id_bucket"); + update->RemoveField(Expressions::Bucket("id", 16)); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot rename and delete partition field: shard"); +} + +TEST_P(UpdatePartitionSpecTest, TestDeleteAndRename) { + ICEBERG_UNWRAP_OR_FAIL(auto update, partitioned_table_->NewUpdatePartitionSpec()); + update->RemoveField(Expressions::Bucket("id", 16)); + update->RenameField("shard", "id_bucket"); + ExpectError(update, ErrorKind::kValidationFailed, + "Cannot delete and rename partition field: shard"); +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveAndAddMultiTimes) { + // Add first time + ICEBERG_UNWRAP_OR_FAIL(auto update1, unpartitioned_table_->NewUpdatePartitionSpec()); + update1->AddField(Expressions::Day("ts"), "ts_date"); + auto add_first_time_spec = ApplyUpdateAndGetSpec(update1); + + // Remove first time + auto table1 = CreateTableWithSpec(add_first_time_spec, "test_table"); + auto update2 = CreateUpdateFromTable(table1); + update2->RemoveField(Expressions::Day("ts")); + auto remove_first_time_spec = ApplyUpdateAndGetSpec(update2); + + // Add second time + auto table2 = CreateTableWithSpec(remove_first_time_spec, "test_table2"); + auto update3 = CreateUpdateFromTable(table2); + update3->AddField(Expressions::Day("ts"), "ts_date"); + auto add_second_time_spec = ApplyUpdateAndGetSpec(update3); + + // Remove second time + auto table3 = CreateTableWithSpec(add_second_time_spec, "test_table3"); + auto update4 = CreateUpdateFromTable(table3); + update4->RemoveField(Expressions::Day("ts")); + auto remove_second_time_spec = ApplyUpdateAndGetSpec(update4); + + // Add third time with month + auto table4 = CreateTableWithSpec(remove_second_time_spec, "test_table4"); + auto update5 = CreateUpdateFromTable(table4); + update5->AddField(Expressions::Month("ts")); + auto add_third_time_spec = ApplyUpdateAndGetSpec(update5); + + // Rename ts_month to ts_date + auto table5 = CreateTableWithSpec(add_third_time_spec, "test_table5"); + auto update6 = CreateUpdateFromTable(table5); + update6->RenameField("ts_month", "ts_date"); + auto updated_spec = ApplyUpdateAndGetSpec(update6); + + if (format_version_ == 1) { + ASSERT_EQ(updated_spec->fields().size(), 3); + // In V1, we expect void transforms for deleted fields + EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_date") == 0); + EXPECT_TRUE(updated_spec->fields()[1].name().find("ts_date") == 0); + EXPECT_EQ(updated_spec->fields()[2].name(), "ts_date"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Month()); + } else { + ICEBERG_UNWRAP_OR_FAIL( + auto expected_spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, + std::vector{ + PartitionField(2, 1000, "ts_date", Transform::Month())}, + 1000)); + auto expected = std::shared_ptr(expected_spec.release()); + AssertPartitionSpecEquals(*expected, *updated_spec); + } +} + +TEST_P(UpdatePartitionSpecTest, TestRemoveAndUpdateWithDifferentTransformation) { + auto initial_spec = MakeExpectedSpec( + {PartitionField(2, 1000, "ts_transformed", Transform::Month())}, 1000); + auto table = CreateTableWithSpec(initial_spec, "test_table"); + auto update = CreateUpdateFromTable(table); + update->RemoveField("ts_transformed"); + update->AddField(Expressions::Day("ts"), "ts_transformed"); + auto updated_spec = ApplyUpdateAndGetSpec(update); + + if (format_version_ == 1) { + ASSERT_EQ(updated_spec->fields().size(), 2); + EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_transformed") == 0); + EXPECT_EQ(updated_spec->fields()[1].name(), "ts_transformed"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void()); + EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day()); + } else { + ASSERT_EQ(updated_spec->fields().size(), 1); + EXPECT_EQ(updated_spec->fields()[0].name(), "ts_transformed"); + EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Day()); + } +} + +TEST_P(UpdatePartitionSpecTest, CommitSuccess) { + // Test empty commit + ICEBERG_UNWRAP_OR_FAIL(auto empty_update, partitioned_table_->NewUpdatePartitionSpec()); + EXPECT_THAT(empty_update->Commit(), IsOk()); + + // Reload table after first commit + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(partitioned_table_ident_)); + + // Test commit with partition spec changes + ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdatePartitionSpec()); + update->AddField(Expressions::Truncate("data", 4), "prefix"); + + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify the partition spec was committed + ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(partitioned_table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto spec, final_table->spec()); + + ASSERT_EQ(spec->fields().size(), 4); + EXPECT_EQ(spec->fields()[0].name(), "category"); + EXPECT_EQ(spec->fields()[1].name(), "ts_day"); + EXPECT_EQ(spec->fields()[2].name(), "shard"); + EXPECT_EQ(spec->fields()[3].name(), "prefix"); + EXPECT_EQ(*spec->fields()[0].transform(), *Transform::Identity()); + EXPECT_EQ(*spec->fields()[1].transform(), *Transform::Day()); + EXPECT_EQ(*spec->fields()[2].transform(), *Transform::Bucket(16)); + EXPECT_EQ(*spec->fields()[3].transform(), *Transform::Truncate(4)); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 49416cdc0..5521d9547 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -28,6 +28,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/util/checked_cast.h" @@ -85,6 +86,17 @@ Status Transaction::Apply(PendingUpdate& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update_sort_order.Apply()); metadata_builder_->SetDefaultSortOrder(result.sort_order); } break; + case PendingUpdate::Kind::kUpdatePartitionSpec: { + auto& update_partition_spec = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_partition_spec.Apply()); + + metadata_builder_->SetDefaultPartitionSpec(result.spec); + if (result.set_as_default) { + metadata_builder_->SetDefaultPartitionSpec(result.spec); + } else { + metadata_builder_->AddPartitionSpec(result.spec); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -137,6 +149,13 @@ Result> Transaction::Commit() { return table_; } +Result> Transaction::NewUpdatePartitionSpec() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, + UpdatePartitionSpec::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec)); + return update_spec; +} + Result> Transaction::NewUpdateProperties() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_properties, UpdateProperties::Make(shared_from_this())); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 05fdea325..18143b8ae 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -56,6 +56,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> Commit(); + /// \brief Create a new UpdatePartitionSpec to update the partition spec of this table + /// and commit the changes. + Result> NewUpdatePartitionSpec(); + /// \brief Create a new UpdateProperties to update table properties and commit the /// changes. Result> NewUpdateProperties(); diff --git a/src/iceberg/transform.cc b/src/iceberg/transform.cc index 614489710..fbc92da22 100644 --- a/src/iceberg/transform.cc +++ b/src/iceberg/transform.cc @@ -388,6 +388,29 @@ std::string Transform::ToString() const { std::unreachable(); } +Result Transform::GeneratePartitionName(std::string_view source_name) const { + switch (transform_type_) { + case TransformType::kIdentity: + return std::string(source_name); + case TransformType::kBucket: + // Format: sourceName_bucket_N (matching Java: sourceName + "_bucket_" + numBuckets) + return std::format("{}_bucket_{}", source_name, std::get(param_)); + case TransformType::kTruncate: + // Format: sourceName_trunc_N (matching Java: sourceName + "_trunc_" + width) + return std::format("{}_trunc_{}", source_name, std::get(param_)); + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kDay: + case TransformType::kHour: + return std::format("{}_{}", source_name, TransformTypeToString(transform_type_)); + case TransformType::kVoid: + return std::format("{}_null", source_name); + case TransformType::kUnknown: + return Invalid("Cannot generate partition name for unknown transform"); + } + std::unreachable(); +} + TransformFunction::TransformFunction(TransformType transform_type, std::shared_ptr source_type) : transform_type_(transform_type), source_type_(std::move(source_type)) {} diff --git a/src/iceberg/transform.h b/src/iceberg/transform.h index 53993b4e3..47cf3ee91 100644 --- a/src/iceberg/transform.h +++ b/src/iceberg/transform.h @@ -197,6 +197,11 @@ class ICEBERG_EXPORT Transform : public util::Formattable { /// \brief Returns a string representation of this transform (e.g., "bucket[16]"). std::string ToString() const override; + /// \brief Generates a partition name for the transform. + /// \param source_name The name of the source column. + /// \return A string representation of the partition name. + Result GeneratePartitionName(std::string_view source_name) const; + /// \brief Equality comparison. friend bool operator==(const Transform& lhs, const Transform& rhs) { return lhs.Equals(rhs); diff --git a/src/iceberg/transform_function.h b/src/iceberg/transform_function.h index b3cfa5a22..c8670824c 100644 --- a/src/iceberg/transform_function.h +++ b/src/iceberg/transform_function.h @@ -59,6 +59,9 @@ class ICEBERG_EXPORT BucketTransform : public TransformFunction { /// \brief Returns INT32 as the output type. std::shared_ptr ResultType() const override; + /// \brief Returns the number of buckets. + int32_t num_buckets() const { return num_buckets_; } + /// \brief Create a BucketTransform. /// \param source_type Type of the input data. /// \param num_buckets Number of buckets to hash into. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 8ca444ccd..f1fb7ffbf 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -126,6 +126,12 @@ class BoundPredicate; class Expression; class Literal; class UnboundPredicate; +class BoundReference; +class BoundTransform; +template +class UnboundTerm; +class NamedReference; +class UnboundTransform; /// \brief Scan. class DataTableScan; @@ -177,6 +183,7 @@ class Transaction; /// \brief Update family. class PendingUpdate; +class UpdatePartitionSpec; class UpdateProperties; class UpdateSortOrder; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index b0ec67066..3fdfda98b 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -16,6 +16,11 @@ # under the License. install_headers( - ['pending_update.h', 'update_sort_order.h', 'update_properties.h'], + [ + 'pending_update.h', + 'update_partition_spec.h', + 'update_sort_order.h', + 'update_properties.h', + ], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index c298ba800..95580f40c 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -42,6 +42,7 @@ namespace iceberg { class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { + kUpdatePartitionSpec, kUpdateProperties, kUpdateSortOrder, }; diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc new file mode 100644 index 000000000..759ba0d59 --- /dev/null +++ b/src/iceberg/update/update_partition_spec.cc @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_spec.h" + +#include +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdatePartitionSpec::Make( + std::shared_ptr transaction) { + if (!transaction) [[unlikely]] { + return InvalidArgument("Cannot create UpdatePartitionSpec without a transaction"); + } + return std::shared_ptr( + new UpdatePartitionSpec(std::move(transaction))); +} + +UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + const TableMetadata& base_metadata = transaction_->current(); + format_version_ = base_metadata.format_version; + + // Get the current/default partition spec + auto spec_result = base_metadata.PartitionSpec(); + if (!spec_result.has_value()) { + AddError(spec_result.error()); + return; + } + spec_ = std::move(spec_result.value()); + + // Get the current schema + auto schema_result = base_metadata.Schema(); + if (!schema_result.has_value()) { + AddError(schema_result.error()); + return; + } + schema_ = std::move(schema_result.value()); + + last_assigned_partition_id_ = spec_->last_assigned_field_id(); + name_to_field_ = IndexSpecByName(*spec_); + transform_to_field_ = IndexSpecByTransform(*spec_); + + // Check for unknown transforms + for (const auto& field : spec_->fields()) { + if (field.transform()->transform_type() == TransformType::kUnknown) { + AddError(ErrorKind::kInvalidArgument, + "Cannot update partition spec with unknown transform: {}", + field.ToString()); + return; + } + } + + // Build index of historical partition fields for efficient recycling (V2+) + if (format_version_ >= 2) { + BuildHistoricalFieldsIndex(); + } +} + +UpdatePartitionSpec::~UpdatePartitionSpec() = default; + +UpdatePartitionSpec& UpdatePartitionSpec::CaseSensitive(bool is_case_sensitive) { + case_sensitive_ = is_case_sensitive; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddNonDefaultSpec() { + set_as_default_ = false; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::string& source_name) { + // Find the source field in the schema + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + auto field_opt, schema_->FindFieldByName(source_name, case_sensitive_)); + + ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot find source field: {}", + source_name); + int32_t source_id = field_opt->get().field_id(); + return AddFieldInternal(std::string(), source_id, Transform::Identity()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::shared_ptr& term, + const std::string& part_name) { + ICEBERG_BUILDER_CHECK(term->is_unbound(), "Cannot add bound term to partition spec"); + // Bind the term to get the source id + if (term->kind() == Term::Kind::kReference) { + const auto& ref = std::dynamic_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, + ref->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_ref->field().field_id(); + return AddFieldInternal(part_name, source_id, Transform::Identity()); + } else if (term->kind() == Term::Kind::kTransform) { + const auto& unbound_transform = std::dynamic_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform, + unbound_transform->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_transform->reference()->field().field_id(); + return AddFieldInternal(part_name, source_id, bound_transform->transform()); + } + + ICEBERG_BUILDER_CHECK(false, "Cannot add {} term to partition spec", term->ToString()); + std::unreachable(); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal( + const std::string& name, int32_t source_id, + const std::shared_ptr& transform) { + // Check for duplicate name in added fields + if (!name.empty()) { + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name), + "Cannot add duplicate partition field: {}", name); + } + + // Cache transform string to avoid repeated ToString() calls + const std::string transform_str = transform->ToString(); + TransformKey validation_key{source_id, transform_str}; + + // Check if this field already exists in the current spec + auto existing_it = transform_to_field_.find(validation_key); + if (existing_it != transform_to_field_.end()) { + const auto& existing = existing_it->second; + const bool is_deleted = deletes_.contains(existing->field_id()); + if (is_deleted && *existing->transform() == *transform) { + // If the field was deleted and we're re-adding the same one, just undo the delete + return RewriteDeleteAndAddField(*existing, name); + } + + ICEBERG_BUILDER_CHECK( + is_deleted, + "Cannot add duplicate partition field '{}' for source {} with transform {}, " + "conflicts with {}", + name, source_id, transform_str, existing->ToString()); + } + + // Check if already being added + auto added_it = transform_to_added_field_.find(validation_key); + ICEBERG_BUILDER_CHECK( + added_it == transform_to_added_field_.end(), + "Cannot add duplicate partition field '{}' for source {} with transform {}, " + "already added: {}", + name, source_id, transform_str, added_it->second); + + // Create or recycle the partition field + PartitionField new_field = RecycleOrCreatePartitionField(source_id, transform, name); + + // Generate name if not provided + std::string field_name; + if (!name.empty()) { + field_name = name; + } else { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(field_name, + GeneratePartitionName(source_id, transform)); + } + + // Create the final field with the name + new_field = PartitionField(new_field.source_id(), new_field.field_id(), field_name, + new_field.transform()); + + // Check for redundant time-based partitions + CheckForRedundantAddedPartitions(new_field); + transform_to_added_field_.emplace(validation_key, field_name); + + // Handle name conflicts with existing fields + auto existing_name_it = name_to_field_.find(field_name); + if (existing_name_it != name_to_field_.end()) { + const auto& existing_field = existing_name_it->second; + const bool existing_is_deleted = deletes_.contains(existing_field->field_id()); + if (!existing_is_deleted) { + if (IsVoidTransform(*existing_field)) { + // Rename the old deleted field + std::string renamed = + std::format("{}_{}", existing_field->name(), existing_field->field_id()); + RenameField(std::string(existing_field->name()), renamed); + } else { + ICEBERG_BUILDER_CHECK(false, "Cannot add duplicate partition field name: {}", + field_name); + } + } else { + // Field is being deleted, rename it to avoid conflict + std::string renamed = + std::format("{}_{}", existing_field->name(), existing_field->field_id()); + renames_[std::string(existing_field->name())] = renamed; + } + } + + adds_.push_back(std::move(new_field)); + added_field_names_.emplace(field_name); + + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RewriteDeleteAndAddField( + const PartitionField& existing, const std::string& name) { + deletes_.erase(existing.field_id()); + if (name.empty() || std::string(existing.name()) == name) { + return *this; + } + return RenameField(std::string(existing.name()), name); +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::string& name) { + // Cannot delete newly added fields + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name), + "Cannot delete newly added field: {}", name); + + // Cannot rename and delete + ICEBERG_BUILDER_CHECK(!renames_.contains(name), + "Cannot rename and delete partition field: {}", name); + + auto field_it = name_to_field_.find(name); + ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(), + "Cannot find partition field to remove: {}", name); + + deletes_.insert(field_it->second->field_id()); + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::shared_ptr& term) { + ICEBERG_BUILDER_CHECK(term->is_unbound(), + "Cannot remove bound term from partition spec"); + // Bind the term to get the source id + if (term->kind() == Term::Kind::kReference) { + const auto& ref = std::dynamic_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, + ref->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_ref->field().field_id(); + // Reference terms use identity transform + TransformKey key{source_id, Transform::Identity()->ToString()}; + return RemoveFieldByTransform(key, term->ToString()); + } else if (term->kind() == Term::Kind::kTransform) { + const auto& unbound_transform = std::dynamic_pointer_cast(term); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform, + unbound_transform->Bind(*schema_, case_sensitive_)); + int32_t source_id = bound_transform->reference()->field().field_id(); + auto transform = bound_transform->transform(); + + TransformKey key{source_id, transform->ToString()}; + return RemoveFieldByTransform(key, term->ToString()); + } + + ICEBERG_BUILDER_CHECK(false, "Cannot remove {} term from partition spec", + term->ToString()); + std::unreachable(); +} + +UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform( + const TransformKey& key, const std::string& term_str) { + // Cannot delete newly added fields + ICEBERG_BUILDER_CHECK(!transform_to_added_field_.contains(key), + "Cannot delete newly added field: {}", term_str); + + auto field_it = transform_to_field_.find(key); + ICEBERG_BUILDER_CHECK(field_it != transform_to_field_.end(), + "Cannot find partition field to remove: {}", term_str); + + const auto& field = field_it->second; + // Cannot rename and delete + ICEBERG_BUILDER_CHECK(!renames_.contains(std::string(field->name())), + "Cannot rename and delete partition field: {}", field->name()); + + deletes_.insert(field->field_id()); + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::RenameField(const std::string& name, + const std::string& new_name) { + // Handle existing void field with the new name + auto existing_it = name_to_field_.find(new_name); + if (existing_it != name_to_field_.end() && IsVoidTransform(*existing_it->second)) { + std::string renamed = std::format("{}_{}", existing_it->second->name(), + existing_it->second->field_id()); + RenameField(std::string(existing_it->second->name()), renamed); + } + + // Cannot rename newly added fields + ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name), + "Cannot rename newly added partition field: {}", name); + + auto field_it = name_to_field_.find(name); + ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(), + "Cannot find partition field to rename: {}", name); + + // Cannot delete and rename + ICEBERG_BUILDER_CHECK(!deletes_.contains(field_it->second->field_id()), + "Cannot delete and rename partition field: {}", name); + + renames_[name] = new_name; + return *this; +} + +Result UpdatePartitionSpec::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // Reserve capacity to avoid reallocations + const size_t existing_fields_count = spec_->fields().size(); + const size_t adds_count = adds_.size(); + std::vector new_fields; + new_fields.reserve(existing_fields_count + adds_count); + + // Process existing fields + for (const auto& field : spec_->fields()) { + if (!deletes_.contains(field.field_id())) { + // Field is kept, check for rename + auto rename_it = renames_.find(std::string(field.name())); + if (rename_it != renames_.end()) { + new_fields.emplace_back(field.source_id(), field.field_id(), rename_it->second, + field.transform()); + } else { + new_fields.push_back(field); + } + } else if (format_version_ < 2) { + // field IDs were not required for v1 and were assigned sequentially in each + // partition spec starting at 1,000. + // To maintain consistent field ids across partition specs in v1 tables, any + // partition field that is removed must be replaced with a null transform. null + // values are always allowed in partition data. + auto rename_it = renames_.find(std::string(field.name())); + std::string field_name = + rename_it != renames_.end() ? rename_it->second : std::string(field.name()); + new_fields.emplace_back(field.source_id(), field.field_id(), std::move(field_name), + Transform::Void()); + } + // In V2, deleted fields are simply removed + } + + // Add new fields + new_fields.insert(new_fields.end(), adds_.begin(), adds_.end()); + + // In V2, if all fields are removed, reset last_assigned_partition_id to allow + // field IDs to restart from 1000 when fields are added again + int32_t last_assigned_id = last_assigned_partition_id_; + if (format_version_ >= 2 && new_fields.empty()) { + last_assigned_id = PartitionSpec::kLegacyPartitionDataIdStart - 1; + } + + // Use -1 as a placeholder for the spec id, the actual spec id will be assigned by + // TableMetadataBuilder when the AddPartitionSpec update is applied. + ICEBERG_ASSIGN_OR_RAISE( + auto new_spec, PartitionSpec::Make(/*spec_id=*/-1, std::move(new_fields), + /*last_assigned_field_id=*/last_assigned_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema()); + ICEBERG_RETURN_UNEXPECTED(new_spec->Validate(*schema, /*allow_missing_fields=*/false)); + + return ApplyResult{.spec = std::move(new_spec), .set_as_default = set_as_default_}; +} + +int32_t UpdatePartitionSpec::AssignFieldId() { return ++last_assigned_partition_id_; } + +PartitionField UpdatePartitionSpec::RecycleOrCreatePartitionField( + int32_t source_id, std::shared_ptr transform, const std::string& name) { + // In V2+, use pre-built index for O(1) lookup instead of O(n*m) iteration + if (format_version_ >= 2 && !historical_fields_.empty()) { + const std::string transform_str = transform->ToString(); + TransformKey key{source_id, transform_str}; + auto it = historical_fields_.find(key); + if (it != historical_fields_.end()) { + const auto& field = it->second; + // If target name is specified then consider it too, otherwise not + if (name.empty() || std::string(field.name()) == name) { + return field; + } + } + } + // No matching field found, create a new one + return {source_id, AssignFieldId(), name, transform}; +} + +Result UpdatePartitionSpec::GeneratePartitionName( + int32_t source_id, const std::shared_ptr& transform) const { + // Find the source field name + ICEBERG_ASSIGN_OR_RAISE(auto field_opt, schema_->FindFieldById(source_id)); + if (!field_opt.has_value()) { + return Invalid("Cannot find source field for partition field: {}", source_id); + } + return transform->GeneratePartitionName(field_opt.value().get().name()); +} + +bool UpdatePartitionSpec::IsTimeTransform(const std::shared_ptr& transform) { + switch (transform->transform_type()) { + case TransformType::kYear: + case TransformType::kMonth: + case TransformType::kDay: + case TransformType::kHour: + return true; + default: + return false; + } +} + +bool UpdatePartitionSpec::IsVoidTransform(const PartitionField& field) { + return field.transform()->transform_type() == TransformType::kVoid; +} + +void UpdatePartitionSpec::CheckForRedundantAddedPartitions(const PartitionField& field) { + if (IsTimeTransform(field.transform())) { + if (added_time_fields_.contains(field.source_id())) { + AddError(ErrorKind::kInvalidArgument, + "Cannot add redundant partition field: {} conflicts with {}", + field.ToString(), added_time_fields_.at(field.source_id())); + return; + } + added_time_fields_.emplace(field.source_id(), field.ToString()); + } +} + +std::unordered_map +UpdatePartitionSpec::IndexSpecByName(const PartitionSpec& spec) { + std::unordered_map index; + for (const auto& field : spec.fields()) { + index.emplace(std::string(field.name()), &field); + } + return index; +} + +std::unordered_map +UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) { + std::unordered_map index; + index.reserve(spec.fields().size()); + for (const auto& field : spec.fields()) { + TransformKey key{field.source_id(), field.transform()->ToString()}; + index.emplace(key, &field); + } + return index; +} + +void UpdatePartitionSpec::BuildHistoricalFieldsIndex() { + const TableMetadata& base_metadata = transaction_->current(); + + // Count total fields across all specs to reserve capacity + size_t total_fields = 0; + for (const auto& partition_spec : base_metadata.partition_specs) { + total_fields += partition_spec->fields().size(); + } + historical_fields_.reserve(total_fields); + + // Index all fields from all historical partition specs + // Later specs override earlier ones for the same (source_id, transform) key + for (const auto& partition_spec : base_metadata.partition_specs) { + for (const auto& field : partition_spec->fields()) { + TransformKey key{field.source_id(), field.transform()->ToString()}; + historical_fields_.emplace(key, field); + } + } +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_partition_spec.h b/src/iceberg/update/update_partition_spec.h new file mode 100644 index 000000000..ef0a146a3 --- /dev/null +++ b/src/iceberg/update/update_partition_spec.h @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/update_partition_spec.h +/// API for partition spec evolution. + +#include +#include +#include +#include +#include + +#include "iceberg/expression/term.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief API for partition spec evolution. +/// +/// When committing, these changes will be applied to the current table metadata. +/// Commit conflicts will not be resolved and will result in a CommitFailed error. +class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdatePartitionSpec() override; + + struct ApplyResult { + std::shared_ptr spec; + bool set_as_default; + }; + + /// \brief Set whether column resolution in the source schema should be case sensitive. + UpdatePartitionSpec& CaseSensitive(bool is_case_sensitive); + + /// \brief Add a new partition field from a source column. + /// + /// The partition field will be created as an identity partition field for the given + /// source column, with the same name as the source column. + /// + /// \param source_name Source column name in the table schema. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddField(const std::string& source_name); + + /// \brief Add a new partition field with a custom name. + /// + /// \param term The term representing the source column, should be unbound. + /// \param part_name Name for the partition field. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddField(const std::shared_ptr& term, + const std::string& part_name = ""); + + /// \brief Remove a partition field by name. + /// + /// \param name Name of the partition field to remove. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RemoveField(const std::string& name); + + /// \brief Remove a partition field by its source term. + /// + /// The partition field with the same transform and source reference will be removed. + /// If the term is a reference and does not have a transform, the identity transform + /// is used. + /// + /// \param term The term representing the source column, should be unbound. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RemoveField(const std::shared_ptr& term); + + /// \brief Rename a field in the partition spec. + /// + /// \param name Name of the partition field to rename. + /// \param new_name Replacement name for the partition field. + /// \return Reference to this for method chaining. + UpdatePartitionSpec& RenameField(const std::string& name, const std::string& new_name); + + /// \brief Sets that the new partition spec will NOT be set as the default. + /// + /// The default behavior is to set the new spec as the default partition spec. + /// + /// \return Reference to this for method chaining. + UpdatePartitionSpec& AddNonDefaultSpec(); + + Kind kind() const final { return Kind::kUpdatePartitionSpec; } + + Result Apply(); + + private: + explicit UpdatePartitionSpec(std::shared_ptr transaction); + + /// \brief Pair of source ID and transform string for indexing. + using TransformKey = std::pair; + + /// \brief Hash function for TransformKey. + struct TransformKeyHash { + size_t operator()(const TransformKey& key) const { + return 31 * std::hash{}(key.first) + std::hash{}(key.second); + } + }; + + /// \brief Assign a new partition field ID. + int32_t AssignFieldId(); + + /// + /// In V2, searches for a similar partition field in historical specs. + /// If not found or in V1, creates a new PartitionField. + + /// \brief Recycle or create a partition field. + /// + /// In V2 it searches for a similar partition field in historical partition specs. Tries + /// to match on source field ID, transform type and target name (optional). If not found + /// or in V1 cases it creates a new PartitionField. + /// + /// \param source_id The source field ID. + /// \param transform The transform function. + /// \param name The target partition field name, if specified. + /// \return The recycled or newly created partition field. + PartitionField RecycleOrCreatePartitionField(int32_t source_id, + std::shared_ptr transform, + const std::string& name); + + /// \brief Internal implementation of AddField with resolved source ID and transform. + UpdatePartitionSpec& AddFieldInternal(const std::string& name, int32_t source_id, + const std::shared_ptr& transform); + + /// \brief Generate a partition field name from the source and transform. + Result GeneratePartitionName( + int32_t source_id, const std::shared_ptr& transform) const; + + /// \brief Check if a transform is a time-based transform. + static bool IsTimeTransform(const std::shared_ptr& transform); + + /// \brief Check if a partition field uses void transform. + static bool IsVoidTransform(const PartitionField& field); + + /// \brief Check for redundant time-based partition fields. + void CheckForRedundantAddedPartitions(const PartitionField& field); + + /// \brief Handle rewriting a delete-and-add operation for the same field. + UpdatePartitionSpec& RewriteDeleteAndAddField(const PartitionField& existing, + const std::string& name); + + /// \brief Internal helper to remove a field by transform key. + UpdatePartitionSpec& RemoveFieldByTransform(const TransformKey& key, + const std::string& term_str); + + /// \brief Index the spec fields by name. + static std::unordered_map IndexSpecByName( + const PartitionSpec& spec); + + /// \brief Index the spec fields by (source_id, transform) pair. + static std::unordered_map + IndexSpecByTransform(const PartitionSpec& spec); + + /// \brief Build index of historical partition fields for efficient recycling (V2+). + void BuildHistoricalFieldsIndex(); + + // Configuration + int32_t format_version_; + std::shared_ptr spec_; + std::shared_ptr schema_; + bool case_sensitive_{true}; + bool set_as_default_{true}; + int32_t last_assigned_partition_id_; + + // Indexes for existing fields + std::unordered_map name_to_field_; + std::unordered_map + transform_to_field_; + + // Index for historical partition fields (V2+ only) for efficient recycling + // Maps (source_id, transform_string) -> PartitionField from all historical specs + std::unordered_map historical_fields_; + + // Pending changes + std::vector adds_; + std::unordered_set added_field_names_; + std::unordered_map added_time_fields_; + std::unordered_map + transform_to_added_field_; + std::unordered_set deletes_; + std::unordered_map renames_; +}; + +} // namespace iceberg From 974cd40ab8e27ba7d57c2f063f98397cd748a4c5 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Wed, 24 Dec 2025 23:33:04 +0800 Subject: [PATCH 2/2] fix: miss one review comment --- src/iceberg/update/update_partition_spec.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/iceberg/update/update_partition_spec.cc b/src/iceberg/update/update_partition_spec.cc index 759ba0d59..4b4e8ad81 100644 --- a/src/iceberg/update/update_partition_spec.cc +++ b/src/iceberg/update/update_partition_spec.cc @@ -19,6 +19,7 @@ #include "iceberg/update/update_partition_spec.h" +#include #include #include #include @@ -29,7 +30,6 @@ #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/table_metadata.h" -#include "iceberg/table_update.h" #include "iceberg/transaction.h" #include "iceberg/transform.h" #include "iceberg/util/macros.h" @@ -66,7 +66,8 @@ UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr transactio } schema_ = std::move(schema_result.value()); - last_assigned_partition_id_ = spec_->last_assigned_field_id(); + last_assigned_partition_id_ = std::max(base_metadata.last_partition_id, + PartitionSpec::kLegacyPartitionDataIdStart - 1); name_to_field_ = IndexSpecByName(*spec_); transform_to_field_ = IndexSpecByTransform(*spec_);