From 8a35befaef19a83bf8d36c84ba3eb5eaf1c8ba83 Mon Sep 17 00:00:00 2001 From: Zhuo Wang Date: Wed, 24 Dec 2025 14:22:47 +0800 Subject: [PATCH] refactor: move the TableMetadataBuilder's update into TableMetadataBuilder::Impl --- src/iceberg/table_metadata.cc | 509 +++++++++++++++++++--------------- src/iceberg/table_metadata.h | 11 - 2 files changed, 285 insertions(+), 235 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 2136494f3..40dcb03c0 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -363,64 +363,295 @@ Result TableMetadataUtil::NewTableMetadataFilePath(const TableMetad // TableMetadataBuilder implementation -struct TableMetadataBuilder::Impl { - // Base metadata (nullptr for new tables) - const TableMetadata* base; - - // Working metadata copy - TableMetadata metadata; - - // Change tracking - std::vector> changes; - std::optional last_added_schema_id; - std::optional last_added_order_id; - std::optional last_added_spec_id; - - // Metadata location tracking - std::string metadata_location; - std::string previous_metadata_location; - - // indexes for convenience - std::unordered_map> schemas_by_id; - std::unordered_map> specs_by_id; - std::unordered_map> sort_orders_by_id; - +class TableMetadataBuilder::Impl { + public: // Constructor for new table - explicit Impl(int8_t format_version) : base(nullptr), metadata{} { - metadata.format_version = format_version; - metadata.last_sequence_number = TableMetadata::kInitialSequenceNumber; - metadata.last_updated_ms = kInvalidLastUpdatedMs; - metadata.last_column_id = Schema::kInvalidColumnId; - metadata.default_spec_id = PartitionSpec::kInitialSpecId; - metadata.last_partition_id = PartitionSpec::kInvalidPartitionFieldId; - metadata.current_snapshot_id = Snapshot::kInvalidSnapshotId; - metadata.default_sort_order_id = SortOrder::kInitialSortOrderId; - metadata.next_row_id = TableMetadata::kInitialRowId; + explicit Impl(int8_t format_version) : base_(nullptr), metadata_{} { + metadata_.format_version = format_version; + metadata_.last_sequence_number = TableMetadata::kInitialSequenceNumber; + metadata_.last_updated_ms = kInvalidLastUpdatedMs; + metadata_.last_column_id = Schema::kInvalidColumnId; + metadata_.default_spec_id = PartitionSpec::kInitialSpecId; + metadata_.last_partition_id = PartitionSpec::kInvalidPartitionFieldId; + metadata_.current_snapshot_id = Snapshot::kInvalidSnapshotId; + metadata_.default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata_.next_row_id = TableMetadata::kInitialRowId; } // Constructor from existing metadata explicit Impl(const TableMetadata* base_metadata, std::string base_metadata_location = "") - : base(base_metadata), metadata(*base_metadata) { + : base_(base_metadata), metadata_(*base_metadata) { // Initialize index maps from base metadata - for (const auto& schema : metadata.schemas) { + for (const auto& schema : metadata_.schemas) { if (schema->schema_id().has_value()) { - schemas_by_id.emplace(schema->schema_id().value(), schema); + schemas_by_id_.emplace(schema->schema_id().value(), schema); } } - for (const auto& spec : metadata.partition_specs) { - specs_by_id.emplace(spec->spec_id(), spec); + for (const auto& spec : metadata_.partition_specs) { + specs_by_id_.emplace(spec->spec_id(), spec); + } + + for (const auto& order : metadata_.sort_orders) { + sort_orders_by_id_.emplace(order->order_id(), order); } - for (const auto& order : metadata.sort_orders) { - sort_orders_by_id.emplace(order->order_id(), order); + metadata_.last_updated_ms = kInvalidLastUpdatedMs; + } + + bool UUIDSet() const { return !metadata_.table_uuid.empty(); } + const std::vector>& changes() const { return changes_; } + const TableMetadata* base() const { return base_; } + const TableMetadata& metadata() const { return metadata_; } + + void SetMetadataLocation(std::string_view metadata_location) { + metadata_location_ = std::string(metadata_location); + if (base_ != nullptr) { + // Carry over lastUpdatedMillis from base and set previousFileLocation to null to + // avoid writing a new metadata log entry. + // This is safe since setting metadata location doesn't cause any changes and no + // other changes can be added when metadata location is configured + previous_metadata_location_ = std::string(); + metadata_.last_updated_ms = base_->last_updated_ms; } + } - metadata.last_updated_ms = kInvalidLastUpdatedMs; + void SetPreviousMetadataLocation(std::string_view previous_metadata_location) { + previous_metadata_location_ = std::string(previous_metadata_location); } + + Status AssignUUID(std::string_view uuid); + Status UpgradeFormatVersion(int8_t new_format_version); + Status SetDefaultSortOrder(int32_t order_id); + Result AddSortOrder(const SortOrder& order); + Status SetProperties(const std::unordered_map& updated); + Status RemoveProperties(const std::vector& removed); + + std::unique_ptr Build(); + + private: + /// \brief Internal method to check for existing sort order and reuse its ID or create a + /// new one + /// \param new_order The sort order to check + /// \return The ID to use for this sort order (reused if exists, new otherwise) + int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order); + + private: + // Base metadata (nullptr for new tables) + const TableMetadata* base_; + + // Working metadata copy + TableMetadata metadata_; + + // Change tracking + std::vector> changes_; + std::optional last_added_schema_id_; + std::optional last_added_order_id_; + std::optional last_added_spec_id_; + + // Metadata location tracking + std::string metadata_location_; + std::string previous_metadata_location_; + + // indexes for convenience + std::unordered_map> schemas_by_id_; + std::unordered_map> specs_by_id_; + std::unordered_map> sort_orders_by_id_; }; +Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) { + if (uuid.empty()) { + return InvalidArgument("Cannot assign empty UUID"); + } + + std::string uuid_str = std::string(uuid); + // Check if UUID is already set to the same value (no-op) + if (StringUtils::EqualsIgnoreCase(metadata_.table_uuid, uuid_str)) { + return {}; + } + + // Update the metadata + metadata_.table_uuid = uuid_str; + + // Record the change + changes_.push_back(std::make_unique(std::move(uuid_str))); + return {}; +} + +Status TableMetadataBuilder::Impl::UpgradeFormatVersion(int8_t new_format_version) { + // Check that the new format version is supported + if (new_format_version > TableMetadata::kSupportedTableFormatVersion) { + return InvalidArgument( + "Cannot upgrade table to unsupported format version: v{} (supported: v{})", + new_format_version, TableMetadata::kSupportedTableFormatVersion); + } + + // Check that we're not downgrading + if (new_format_version < metadata_.format_version) { + return InvalidArgument("Cannot downgrade v{} table to v{}", metadata_.format_version, + new_format_version); + } + + // No-op if the version is the same + if (new_format_version == metadata_.format_version) { + return {}; + } + + // Update the format version + metadata_.format_version = new_format_version; + + // Record the change + changes_.push_back(std::make_unique(new_format_version)); + + return {}; +} + +Status TableMetadataBuilder::Impl::SetDefaultSortOrder(int32_t order_id) { + if (order_id == -1) { + if (!last_added_order_id_.has_value()) { + return InvalidArgument( + "Cannot set last added sort order: no sort order has been added"); + } + return SetDefaultSortOrder(last_added_order_id_.value()); + } + + if (order_id == metadata_.default_sort_order_id) { + return {}; + } + + metadata_.default_sort_order_id = order_id; + + if (last_added_order_id_ == std::make_optional(order_id)) { + changes_.push_back(std::make_unique(kLastAdded)); + } else { + changes_.push_back(std::make_unique(order_id)); + } + return {}; +} + +Result TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order) { + int32_t new_order_id = ReuseOrCreateNewSortOrderId(order); + + if (sort_orders_by_id_.find(new_order_id) != sort_orders_by_id_.end()) { + // update last_added_order_id if the order was added in this set of changes (since it + // is now the last) + bool is_new_order = + last_added_order_id_.has_value() && + std::ranges::find_if(changes_, [new_order_id](const auto& change) { + auto* add_sort_order = dynamic_cast(change.get()); + return add_sort_order && + add_sort_order->sort_order()->order_id() == new_order_id; + }) != changes_.cend(); + last_added_order_id_ = is_new_order ? std::make_optional(new_order_id) : std::nullopt; + return new_order_id; + } + + // Get current schema and validate the sort order against it + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema()); + ICEBERG_RETURN_UNEXPECTED(order.Validate(*schema)); + + std::shared_ptr new_order; + if (order.is_unsorted()) { + new_order = SortOrder::Unsorted(); + } else { + // Unlike freshSortOrder from Java impl, we don't use field name from old bound + // schema to rebuild the sort order. + ICEBERG_ASSIGN_OR_RAISE( + new_order, + SortOrder::Make(new_order_id, std::vector(order.fields().begin(), + order.fields().end()))); + } + + metadata_.sort_orders.push_back(new_order); + sort_orders_by_id_.emplace(new_order_id, new_order); + + changes_.push_back(std::make_unique(new_order)); + last_added_order_id_ = new_order_id; + return new_order_id; +} + +Status TableMetadataBuilder::Impl::SetProperties( + const std::unordered_map& updated) { + // If updated is empty, return early (no-op) + if (updated.empty()) { + return {}; + } + + // Add all updated properties to the metadata properties + for (const auto& [key, value] : updated) { + metadata_.properties.mutable_configs()[key] = value; + } + + // Record the change + changes_.push_back(std::make_unique(updated)); + + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveProperties( + const std::vector& removed) { + // If removed is empty, return early (no-op) + if (removed.empty()) { + return {}; + } + + // Remove each property from the metadata properties + for (const auto& key : removed) { + metadata_.properties.mutable_configs().erase(key); + } + + // Record the change + changes_.push_back(std::make_unique(removed)); + + return {}; +} + +std::unique_ptr TableMetadataBuilder::Impl::Build() { + // 1. Validate metadata consistency through TableMetadata#Validate + + // 2. Update last_updated_ms if there are changes + if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) { + metadata_.last_updated_ms = + TimePointMs{std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch())}; + } + + // 3. Buildup metadata_log from base metadata + int32_t max_metadata_log_size = + metadata_.properties.Get(TableProperties::kMetadataPreviousVersionsMax); + if (base_ != nullptr && !previous_metadata_location_.empty()) { + metadata_.metadata_log.emplace_back(base_->last_updated_ms, + previous_metadata_location_); + } + if (metadata_.metadata_log.size() > max_metadata_log_size) { + metadata_.metadata_log.erase(metadata_.metadata_log.begin(), + metadata_.metadata_log.end() - max_metadata_log_size); + } + + // TODO(anyone): 4. update snapshot_log + + // 5. Create and return the TableMetadata + return std::make_unique(std::move(metadata_)); +} + +int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId( + const SortOrder& new_order) { + if (new_order.is_unsorted()) { + return SortOrder::kUnsortedOrderId; + } + // determine the next order id + int32_t new_order_id = SortOrder::kInitialSortOrderId; + for (const auto& order : metadata_.sort_orders) { + if (order->SameOrder(new_order)) { + return order->order_id(); + } else if (new_order_id <= order->order_id()) { + new_order_id = order->order_id() + 1; + } + } + return new_order_id; +} + TableMetadataBuilder::TableMetadataBuilder(int8_t format_version) : impl_(std::make_unique(format_version)) {} @@ -447,77 +678,32 @@ std::unique_ptr TableMetadataBuilder::BuildFrom( TableMetadataBuilder& TableMetadataBuilder::SetMetadataLocation( std::string_view metadata_location) { - impl_->metadata_location = std::string(metadata_location); - if (impl_->base != nullptr) { - // Carry over lastUpdatedMillis from base and set previousFileLocation to null to - // avoid writing a new metadata log entry. - // This is safe since setting metadata location doesn't cause any changes and no other - // changes can be added when metadata location is configured - impl_->previous_metadata_location = std::string(); - impl_->metadata.last_updated_ms = impl_->base->last_updated_ms; - } + impl_->SetMetadataLocation(metadata_location); return *this; } TableMetadataBuilder& TableMetadataBuilder::SetPreviousMetadataLocation( std::string_view previous_metadata_location) { - impl_->previous_metadata_location = std::string(previous_metadata_location); + impl_->SetPreviousMetadataLocation(previous_metadata_location); return *this; } TableMetadataBuilder& TableMetadataBuilder::AssignUUID() { - if (impl_->metadata.table_uuid.empty()) { + if (!impl_->UUIDSet()) { // Generate a random UUID return AssignUUID(Uuid::GenerateV4().ToString()); } - return *this; } TableMetadataBuilder& TableMetadataBuilder::AssignUUID(std::string_view uuid) { - std::string uuid_str(uuid); - - ICEBERG_BUILDER_CHECK(!uuid_str.empty(), "Cannot assign empty UUID"); - - // Check if UUID is already set to the same value (no-op) - if (StringUtils::EqualsIgnoreCase(impl_->metadata.table_uuid, uuid_str)) { - return *this; - } - - // Update the metadata - impl_->metadata.table_uuid = uuid_str; - - // Record the change - impl_->changes.push_back(std::make_unique(std::move(uuid_str))); - + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AssignUUID(uuid)); return *this; } TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion( int8_t new_format_version) { - // Check that the new format version is supported - ICEBERG_BUILDER_CHECK( - new_format_version <= TableMetadata::kSupportedTableFormatVersion, - "Cannot upgrade table to unsupported format version: v{} (supported: v{})", - new_format_version, TableMetadata::kSupportedTableFormatVersion); - - // Check that we're not downgrading - ICEBERG_BUILDER_CHECK(new_format_version >= impl_->metadata.format_version, - "Cannot downgrade v{} table to v{}", - impl_->metadata.format_version, new_format_version); - - // No-op if the version is the same - if (new_format_version == impl_->metadata.format_version) { - return *this; - } - - // Update the format version - impl_->metadata.format_version = new_format_version; - - // Record the change - impl_->changes.push_back( - std::make_unique(new_format_version)); - + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->UpgradeFormatVersion(new_format_version)); return *this; } @@ -560,96 +746,21 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas( TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder( std::shared_ptr order) { - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto order_id, impl_->AddSortOrder(*order)); return SetDefaultSortOrder(order_id); } TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id) { - if (order_id == -1) { - ICEBERG_BUILDER_CHECK( - impl_->last_added_order_id.has_value(), - "Cannot set last added sort order: no sort order has been added"); - return SetDefaultSortOrder(impl_->last_added_order_id.value()); - } - - if (order_id == impl_->metadata.default_sort_order_id) { - return *this; - } - - impl_->metadata.default_sort_order_id = order_id; - - if (impl_->last_added_order_id == std::make_optional(order_id)) { - impl_->changes.push_back(std::make_unique(kLastAdded)); - } else { - impl_->changes.push_back(std::make_unique(order_id)); - } + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultSortOrder(order_id)); return *this; } -Result TableMetadataBuilder::AddSortOrderInternal(const SortOrder& order) { - int32_t new_order_id = ReuseOrCreateNewSortOrderId(order); - - if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) { - // update last_added_order_id if the order was added in this set of changes (since it - // is now the last) - bool is_new_order = - impl_->last_added_order_id.has_value() && - std::ranges::find_if(impl_->changes, [new_order_id](const auto& change) { - auto* add_sort_order = dynamic_cast(change.get()); - return add_sort_order && - add_sort_order->sort_order()->order_id() == new_order_id; - }) != impl_->changes.cend(); - impl_->last_added_order_id = - is_new_order ? std::make_optional(new_order_id) : std::nullopt; - return new_order_id; - } - - // Get current schema and validate the sort order against it - ICEBERG_ASSIGN_OR_RAISE(auto schema, impl_->metadata.Schema()); - ICEBERG_RETURN_UNEXPECTED(order.Validate(*schema)); - - std::shared_ptr new_order; - if (order.is_unsorted()) { - new_order = SortOrder::Unsorted(); - } else { - // Unlike freshSortOrder from Java impl, we don't use field name from old bound - // schema to rebuild the sort order. - ICEBERG_ASSIGN_OR_RAISE( - new_order, - SortOrder::Make(new_order_id, std::vector(order.fields().begin(), - order.fields().end()))); - } - - impl_->metadata.sort_orders.push_back(new_order); - impl_->sort_orders_by_id.emplace(new_order_id, new_order); - - impl_->changes.push_back(std::make_unique(new_order)); - impl_->last_added_order_id = new_order_id; - return new_order_id; -} - TableMetadataBuilder& TableMetadataBuilder::AddSortOrder( std::shared_ptr order) { - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto order_id, impl_->AddSortOrder(*order)); return *this; } -int32_t TableMetadataBuilder::ReuseOrCreateNewSortOrderId(const SortOrder& new_order) { - if (new_order.is_unsorted()) { - return SortOrder::kUnsortedOrderId; - } - // determine the next order id - int32_t new_order_id = SortOrder::kInitialSortOrderId; - for (const auto& order : impl_->metadata.sort_orders) { - if (order->SameOrder(new_order)) { - return order->order_id(); - } else if (new_order_id <= order->order_id()) { - new_order_id = order->order_id() + 1; - } - } - return new_order_id; -} - TableMetadataBuilder& TableMetadataBuilder::AddSnapshot( std::shared_ptr snapshot) { throw IcebergError(std::format("{} not implemented", __FUNCTION__)); @@ -704,37 +815,13 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics( TableMetadataBuilder& TableMetadataBuilder::SetProperties( const std::unordered_map& updated) { - // If updated is empty, return early (no-op) - if (updated.empty()) { - return *this; - } - - // Add all updated properties to the metadata properties - for (const auto& [key, value] : updated) { - impl_->metadata.properties.mutable_configs()[key] = value; - } - - // Record the change - impl_->changes.push_back(std::make_unique(updated)); - + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetProperties(updated)); return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveProperties( const std::vector& removed) { - // If removed is empty, return early (no-op) - if (removed.empty()) { - return *this; - } - - // Remove each property from the metadata properties - for (const auto& key : removed) { - impl_->metadata.properties.mutable_configs().erase(key); - } - - // Record the change - impl_->changes.push_back(std::make_unique(removed)); - + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveProperties(removed)); return *this; } @@ -754,41 +841,15 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveEncryptionKey(std::string_view Result> TableMetadataBuilder::Build() { // 1. Check for accumulated errors ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - - // 2. Validate metadata consistency through TableMetadata#Validate - - // 3. Update last_updated_ms if there are changes - if (impl_->metadata.last_updated_ms == kInvalidLastUpdatedMs) { - impl_->metadata.last_updated_ms = - TimePointMs{std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch())}; - } - - // 4. Buildup metadata_log from base metadata - int32_t max_metadata_log_size = - impl_->metadata.properties.Get(TableProperties::kMetadataPreviousVersionsMax); - if (impl_->base != nullptr && !impl_->previous_metadata_location.empty()) { - impl_->metadata.metadata_log.emplace_back(impl_->base->last_updated_ms, - impl_->previous_metadata_location); - } - if (impl_->metadata.metadata_log.size() > max_metadata_log_size) { - impl_->metadata.metadata_log.erase( - impl_->metadata.metadata_log.begin(), - impl_->metadata.metadata_log.end() - max_metadata_log_size); - } - - // TODO(anyone): 5. update snapshot_log - - // 6. Create and return the TableMetadata - return std::make_unique(std::move(impl_->metadata)); + return impl_->Build(); } const std::vector>& TableMetadataBuilder::changes() const { - return impl_->changes; + return impl_->changes(); } -const TableMetadata* TableMetadataBuilder::base() const { return impl_->base; } +const TableMetadata* TableMetadataBuilder::base() const { return impl_->base(); } -const TableMetadata& TableMetadataBuilder::current() const { return impl_->metadata; } +const TableMetadata& TableMetadataBuilder::current() const { return impl_->metadata(); } } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 3f2f36101..ce7975c6e 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -446,17 +446,6 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \brief Private constructor for building from existing metadata explicit TableMetadataBuilder(const TableMetadata* base); - /// \brief Internal method to add a sort order and return its ID - /// \param order The sort order to add - /// \return The ID of the added or reused sort order - Result AddSortOrderInternal(const SortOrder& order); - - /// \brief Internal method to check for existing sort order and reuse its ID or create a - /// new one - /// \param new_order The sort order to check - /// \return The ID to use for this sort order (reused if exists, new otherwise) - int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order); - /// Internal state members struct Impl; std::unique_ptr impl_;