Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,16 +551,29 @@ def commit_table(

if hive_table and current_table:
# Table exists, update it.
hive_table.parameters = _construct_parameters(
new_parameters = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
metadata_properties=updated_staged_table.properties,
)

# Detect properties that were removed from Iceberg metadata
removed_keys = current_table.properties.keys() - updated_staged_table.properties.keys()

# Sync HMS parameters: Iceberg metadata is the source of truth, HMS parameters are
# a projection of Iceberg state plus any HMS-only properties.
# Start with existing HMS params, remove deleted Iceberg properties, then apply Iceberg values.
merged_params = dict(hive_table.parameters or {})
for key in removed_keys:
merged_params.pop(key, None)
merged_params.update(new_parameters)
hive_table.parameters = merged_params

# Update hive's schema and properties
hive_table.sd = _construct_hive_storage_descriptor(
updated_staged_table.schema(),
updated_staged_table.location(),
property_as_bool(updated_staged_table.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
)
open_client.alter_table_with_environment_context(
dbname=database_name,
Expand Down
152 changes: 152 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,158 @@ def test_hive_properties(catalog: Catalog) -> None:
assert hive_table.parameters.get("abc") is None


@pytest.mark.integration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding a test for property deletion to ensure it's not picked up from the old state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_hive_preserves_hms_specific_properties(catalog: Catalog) -> None:
"""Test that HMS-specific table properties are preserved during table commits.
This verifies that HMS-specific properties that are not managed by Iceberg
are preserved during commits, rather than being lost.
Regression test for: https://github.com/apache/iceberg-python/issues/2926
"""
table = create_table(catalog)
hive_client: _HiveClient = _HiveClient(catalog.properties["uri"])
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
# Add HMS-specific properties that aren't managed by Iceberg
hive_table.parameters["table_category"] = "production"
hive_table.parameters["data_owner"] = "data_team"
open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table)

with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("table_category") == "production"
assert hive_table.parameters.get("data_owner") == "data_team"

table.transaction().set_properties({"iceberg_property": "new_value"}).commit_transaction()

# Verify that HMS-specific properties are STILL present after commit
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
# HMS-specific properties should be preserved
assert hive_table.parameters.get("table_category") == "production", (
"HMS property 'table_category' was lost during commit!"
)
assert hive_table.parameters.get("data_owner") == "data_team", "HMS property 'data_owner' was lost during commit!"
# Iceberg properties should also be present
assert hive_table.parameters.get("iceberg_property") == "new_value"


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_iceberg_property_deletion_not_restored_from_old_hms_state(catalog: Catalog) -> None:
Comment on lines +177 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: parameterizing these tests do not require parameterization since there is only one argument.

Suggested change
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_iceberg_property_deletion_not_restored_from_old_hms_state(catalog: Catalog) -> None:
def test_iceberg_property_deletion_not_restored_from_old_hms_state(session_catalog_hive: Catalog) -> None:

"""Test that deleted Iceberg properties are truly removed and not restored from old HMS state.
When a property is removed through Iceberg, it should be deleted from HMS and not
come back from the old HMS state during merge operations.
"""
table = create_table(catalog)
hive_client: _HiveClient = _HiveClient(catalog.properties["uri"])

# Set multiple Iceberg properties
table.transaction().set_properties({"prop_to_keep": "keep_value", "prop_to_delete": "delete_me"}).commit_transaction()

# Verify both properties exist
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("prop_to_keep") == "keep_value"
assert hive_table.parameters.get("prop_to_delete") == "delete_me"

# Delete one property through Iceberg
table.transaction().remove_properties("prop_to_delete").commit_transaction()

# Verify property is deleted from HMS
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("prop_to_keep") == "keep_value"
assert hive_table.parameters.get("prop_to_delete") is None, "Deleted property should not exist in HMS!"

# Perform another Iceberg commit
table.transaction().set_properties({"new_prop": "new_value"}).commit_transaction()

# Ensure deleted property doesn't come back from old state
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("prop_to_keep") == "keep_value"
assert hive_table.parameters.get("new_prop") == "new_value"
assert hive_table.parameters.get("prop_to_delete") is None, "Deleted property should NOT be restored from old HMS state!"


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_iceberg_metadata_is_source_of_truth(catalog: Catalog) -> None:
"""Test that Iceberg metadata is the source of truth for all Iceberg-managed properties.
If an external tool sets an HMS property with the same name as an Iceberg-managed
property, Iceberg's value should win during commits.
"""
table = create_table(catalog)
hive_client: _HiveClient = _HiveClient(catalog.properties["uri"])

# Set an Iceberg property
table.transaction().set_properties({"my_prop": "iceberg_value"}).commit_transaction()

# External tool modifies the same property in HMS
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
hive_table.parameters["my_prop"] = "hms_value" # Conflicting value
open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table)

# Verify HMS has the external value
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("my_prop") == "hms_value"

# Perform another Iceberg commit
table.transaction().set_properties({"another_prop": "test"}).commit_transaction()

# Iceberg's value should take precedence
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("my_prop") == "iceberg_value", (
"Iceberg property value should take precedence over conflicting HMS value!"
)
assert hive_table.parameters.get("another_prop") == "test"


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_hive_critical_properties_always_from_iceberg(catalog: Catalog) -> None:
"""Test that critical properties (EXTERNAL, table_type, metadata_location) always come from Iceberg.
These properties should never be carried over from old HMS state.
"""
table = create_table(catalog)
hive_client: _HiveClient = _HiveClient(catalog.properties["uri"])

# Get original metadata_location
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
original_metadata_location = hive_table.parameters.get("metadata_location")
assert original_metadata_location is not None
assert hive_table.parameters.get("EXTERNAL") == "TRUE"
assert hive_table.parameters.get("table_type") == "ICEBERG"

# Try to tamper with critical properties via HMS
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
hive_table.parameters["EXTERNAL"] = "FALSE" # Try to change
open_client.alter_table(TABLE_NAME[0], TABLE_NAME[1], hive_table)

# Perform Iceberg commit
table.transaction().set_properties({"test_prop": "value"}).commit_transaction()

# Critical properties should be restored by Iceberg
with hive_client as open_client:
hive_table = open_client.get_table(*TABLE_NAME)
assert hive_table.parameters.get("EXTERNAL") == "TRUE", "EXTERNAL should always be TRUE from Iceberg!"
assert hive_table.parameters.get("table_type") == "ICEBERG", "table_type should always be ICEBERG!"
# metadata_location should be updated (new metadata file)
new_metadata_location = hive_table.parameters.get("metadata_location")
assert new_metadata_location != original_metadata_location, "metadata_location should be updated!"


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_properties_dict(catalog: Catalog) -> None:
Expand Down