diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index e096470451..3046e25626 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -551,16 +551,29 @@ def commit_table( if hive_table and current_table: # Table exists, update it. - hive_table.parameters = _construct_parameters( + new_parameters = _construct_parameters( metadata_location=updated_staged_table.metadata_location, previous_metadata_location=current_table.metadata_location, metadata_properties=updated_staged_table.properties, ) + + # Detect properties that were removed from Iceberg metadata + removed_keys = current_table.properties.keys() - updated_staged_table.properties.keys() + + # Sync HMS parameters: Iceberg metadata is the source of truth, HMS parameters are + # a projection of Iceberg state plus any HMS-only properties. + # Start with existing HMS params, remove deleted Iceberg properties, then apply Iceberg values. + merged_params = dict(hive_table.parameters or {}) + for key in removed_keys: + merged_params.pop(key, None) + merged_params.update(new_parameters) + hive_table.parameters = merged_params + # Update hive's schema and properties hive_table.sd = _construct_hive_storage_descriptor( updated_staged_table.schema(), updated_staged_table.location(), - property_as_bool(updated_staged_table.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), + property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), ) open_client.alter_table_with_environment_context( dbname=database_name, diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 0d52365d04..6e3b4c4458 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -135,6 +135,157 @@ def test_hive_properties(catalog: Catalog) -> None: assert hive_table.parameters.get("abc") is None +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_hive_preserves_hms_specific_properties(catalog: Catalog) -> None: + """Test that HMS-specific table properties are preserved during table commits. + + This verifies that HMS-specific properties that are not managed by Iceberg + are preserved during commits, rather than being lost. + + Regression test for: https://github.com/apache/iceberg-python/issues/2926 + """ + table = create_table(catalog) + hive_client: _HiveClient = _HiveClient(catalog.properties["uri"]) + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + # Add HMS-specific properties that aren't managed by Iceberg + hive_table.parameters["table_category"] = "production" + hive_table.parameters["data_owner"] = "data_team" + open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table) + + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("table_category") == "production" + assert hive_table.parameters.get("data_owner") == "data_team" + + table.transaction().set_properties({"iceberg_property": "new_value"}).commit_transaction() + + # Verify that HMS-specific properties are STILL present after commit + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + # HMS-specific properties should be preserved + assert hive_table.parameters.get("table_category") == "production", ( + "HMS property 'table_category' was lost during commit!" + ) + assert hive_table.parameters.get("data_owner") == "data_team", "HMS property 'data_owner' was lost during commit!" + # Iceberg properties should also be present + assert hive_table.parameters.get("iceberg_property") == "new_value" + + +@pytest.mark.integration +def test_iceberg_property_deletion_not_restored_from_old_hms_state(session_catalog_hive: Catalog) -> None: + """Test that deleted Iceberg properties are truly removed and not restored from old HMS state. + + When a property is removed through Iceberg, it should be deleted from HMS and not + come back from the old HMS state during merge operations. + """ + table = create_table(catalog) + hive_client: _HiveClient = _HiveClient(catalog.properties["uri"]) + + # Set multiple Iceberg properties + table.transaction().set_properties({"prop_to_keep": "keep_value", "prop_to_delete": "delete_me"}).commit_transaction() + + # Verify both properties exist + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("prop_to_keep") == "keep_value" + assert hive_table.parameters.get("prop_to_delete") == "delete_me" + + # Delete one property through Iceberg + table.transaction().remove_properties("prop_to_delete").commit_transaction() + + # Verify property is deleted from HMS + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("prop_to_keep") == "keep_value" + assert hive_table.parameters.get("prop_to_delete") is None, "Deleted property should not exist in HMS!" + + # Perform another Iceberg commit + table.transaction().set_properties({"new_prop": "new_value"}).commit_transaction() + + # Ensure deleted property doesn't come back from old state + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("prop_to_keep") == "keep_value" + assert hive_table.parameters.get("new_prop") == "new_value" + assert hive_table.parameters.get("prop_to_delete") is None, "Deleted property should NOT be restored from old HMS state!" + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_iceberg_metadata_is_source_of_truth(catalog: Catalog) -> None: + """Test that Iceberg metadata is the source of truth for all Iceberg-managed properties. + + If an external tool sets an HMS property with the same name as an Iceberg-managed + property, Iceberg's value should win during commits. + """ + table = create_table(catalog) + hive_client: _HiveClient = _HiveClient(catalog.properties["uri"]) + + # Set an Iceberg property + table.transaction().set_properties({"my_prop": "iceberg_value"}).commit_transaction() + + # External tool modifies the same property in HMS + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + hive_table.parameters["my_prop"] = "hms_value" # Conflicting value + open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table) + + # Verify HMS has the external value + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("my_prop") == "hms_value" + + # Perform another Iceberg commit + table.transaction().set_properties({"another_prop": "test"}).commit_transaction() + + # Iceberg's value should take precedence + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("my_prop") == "iceberg_value", ( + "Iceberg property value should take precedence over conflicting HMS value!" + ) + assert hive_table.parameters.get("another_prop") == "test" + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_hive_critical_properties_always_from_iceberg(catalog: Catalog) -> None: + """Test that critical properties (EXTERNAL, table_type, metadata_location) always come from Iceberg. + + These properties should never be carried over from old HMS state. + """ + table = create_table(catalog) + hive_client: _HiveClient = _HiveClient(catalog.properties["uri"]) + + # Get original metadata_location + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + original_metadata_location = hive_table.parameters.get("metadata_location") + assert original_metadata_location is not None + assert hive_table.parameters.get("EXTERNAL") == "TRUE" + assert hive_table.parameters.get("table_type") == "ICEBERG" + + # Try to tamper with critical properties via HMS + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + hive_table.parameters["EXTERNAL"] = "FALSE" # Try to change + open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table) + + # Perform Iceberg commit + table.transaction().set_properties({"test_prop": "value"}).commit_transaction() + + # Critical properties should be restored by Iceberg + with hive_client as open_client: + hive_table = open_client.get_table(*TABLE_NAME) + assert hive_table.parameters.get("EXTERNAL") == "TRUE", "EXTERNAL should always be TRUE from Iceberg!" + assert hive_table.parameters.get("table_type") == "ICEBERG", "table_type should always be ICEBERG!" + # metadata_location should be updated (new metadata file) + new_metadata_location = hive_table.parameters.get("metadata_location") + assert new_metadata_location != original_metadata_location, "metadata_location should be updated!" + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_table_properties_dict(catalog: Catalog) -> None: