From bc562b70f4f9eaa092647c942924a7cf74587269 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Mon, 24 Nov 2025 01:03:09 +0000 Subject: [PATCH] HMS commit implementation Partial implementation of HMS traditional commit stabilising fix issues with hms lock flow refactoring for optimistic locks address format --- Cargo.lock | 2 + crates/catalog/hms/Cargo.toml | 1 + crates/catalog/hms/src/catalog.rs | 198 ++++++++++++++++++- crates/catalog/hms/src/utils.rs | 69 +++++++ crates/catalog/hms/tests/hms_catalog_test.rs | 133 ++++++++++++- 5 files changed, 396 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97ee25d658..97a9287a91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3450,6 +3450,7 @@ dependencies = [ "tracing", "volo", "volo-thrift", + "whoami", ] [[package]] @@ -7505,6 +7506,7 @@ checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" dependencies = [ "libredox", "wasite", + "web-sys", ] [[package]] diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 549dbb9c02..8259472dbc 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -52,6 +52,7 @@ linkedbytes = { workspace = true } metainfo = { workspace = true } motore-macros = { workspace = true } volo = { workspace = true } +whoami = "1.6.1" [dev-dependencies] ctor = { workspace = true } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index b7d192210b..25268e5422 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -50,7 +50,14 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered"; /// HMS Catalog warehouse location pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; -/// Builder for [`RestCatalog`]. +///HMS Hive Locks Disabled +pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled"; + +/// HMS Environment Context +const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key"; +const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value"; + +/// Builder for [`HmsCatalog`]. #[derive(Debug)] pub struct HmsCatalogBuilder(HmsCatalogConfig); @@ -167,6 +174,43 @@ impl Debug for HmsCatalog { } } +/// RAII guard for HMS table locks. Automatically releases the lock when dropped. +struct HmsLockGuard { + client: ThriftHiveMetastoreClient, + lockid: i64, +} + +impl HmsLockGuard { + async fn acquire( + client: &ThriftHiveMetastoreClient, + db_name: &str, + tbl_name: &str, + ) -> Result { + let lock = client + .lock(create_lock_request(db_name, tbl_name)) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + Ok(Self { + client: client.clone(), + lockid: lock.lockid, + }) + } +} + +impl Drop for HmsLockGuard { + fn drop(&mut self) { + let client = self.client.clone(); + let lockid = self.lockid; + tokio::spawn(async move { + let _ = client + .unlock(hive_metastore::UnlockRequest { lockid }) + .await; + }); + } +} + impl HmsCatalog { /// Create a new hms catalog. fn new(config: HmsCatalogConfig) -> Result { @@ -208,6 +252,64 @@ impl HmsCatalog { pub fn file_io(&self) -> FileIO { self.file_io.clone() } + + /// Applies a commit to a table and prepares the update for HMS. + /// # Returns + /// A tuple of (staged_table, new_hive_table) ready for HMS alter_table operation + async fn apply_and_prepare_update( + &self, + commit: TableCommit, + db_name: &str, + tbl_name: &str, + hive_table: &hive_metastore::Table, + ) -> Result<(Table, hive_metastore::Table)> { + let metadata_location = get_metadata_location(&hive_table.parameters)?; + + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + + let cur_table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name.to_string()), + tbl_name.to_string(), + )) + .build()?; + + let staged_table = commit.apply(cur_table)?; + staged_table + .metadata() + .write_to( + staged_table.file_io(), + staged_table.metadata_location_result()?, + ) + .await?; + + let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?; + + Ok((staged_table, new_hive_table)) + } + + /// Builds an EnvironmentContext for optimistic locking with HMS. + /// + /// The context includes the expected metadata_location, which HMS will use + /// to validate that the table hasn't been modified concurrently. + fn build_environment_context(metadata_location: &str) -> hive_metastore::EnvironmentContext { + let mut env_context_properties = pilota::AHashMap::new(); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_KEY.into(), + "metadata_location".into(), + ); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_VALUE.into(), + pilota::FastStr::from_string(metadata_location.to_string()), + ); + + hive_metastore::EnvironmentContext { + properties: Some(env_context_properties), + } + } } #[async_trait] @@ -603,10 +705,94 @@ impl Catalog for HmsCatalog { )) } - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + /// Updates an existing table by applying a commit operation. + /// + /// This method supports two update strategies depending on the catalog configuration: + /// + /// **Optimistic Locking** (when `hive_locks_disabled` is set): + /// - Retrieves the current table state from HMS without acquiring locks + /// - Constructs an `EnvironmentContext` with the expected metadata location + /// - Uses `alter_table_with_environment_context` to perform an atomic + /// compare-and-swap operation. + /// - HMS will reject the update if the metadata location has changed, + /// indicating a concurrent modification + /// + /// **Traditional Locking** (default): + /// - Acquires an exclusive HMS lock on the table before making changes + /// - Retrieves the current table state + /// - Applies the commit and writes new metadata + /// - Updates the table in HMS using `alter_table` + /// - Releases the lock after the operation completes + /// + /// # Returns + /// A `Result` wrapping the updated `Table` object with new metadata. + /// + /// # Errors + /// This function may return an error in several scenarios: + /// - Failure to validate the namespace or table identifier + /// - Inability to acquire a lock (traditional locking mode) + /// - Failure to retrieve the table from HMS + /// - Errors reading or writing table metadata + /// - HMS rejects the update due to concurrent modification (optimistic locking) + /// - Errors from the underlying Thrift communication with HMS + async fn update_table(&self, commit: TableCommit) -> Result
{ + let ident = commit.identifier().clone(); + let db_name = validate_namespace(ident.namespace())?; + let tbl_name = ident.name.clone(); + + if self.config.props.contains_key(HMS_HIVE_LOCKS_DISABLED) { + // Optimistic locking path: read first, then validate with EnvironmentContext + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), tbl_name.clone().into()) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + let metadata_location = get_metadata_location(&hive_table.parameters)?; + let env_context = Self::build_environment_context(&metadata_location); + + let (staged_table, new_hive_table) = self + .apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; + + self.client + .0 + .alter_table_with_environment_context( + db_name.into(), + tbl_name.into(), + new_hive_table, + env_context, + ) + .await + .map_err(from_thrift_error)?; + + Ok(staged_table) + } else { + // Traditional locking path: acquire lock first, then read + let _guard = HmsLockGuard::acquire(&self.client.0, &db_name, &tbl_name).await?; + + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), tbl_name.clone().into()) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + let (staged_table, new_hive_table) = self + .apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; + + self.client + .0 + .alter_table(db_name.into(), tbl_name.into(), new_hive_table) + .await + .map_err(from_thrift_error)?; + + Ok(staged_table) + // Lock automatically released here via Drop + } } } diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 096e792f61..630421bccd 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use chrono::Utc; use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor}; use iceberg::spec::Schema; +use iceberg::table::Table; use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; use pilota::{AHashMap, FastStr}; @@ -155,6 +156,54 @@ pub(crate) fn convert_to_database( Ok(db) } +pub(crate) fn update_hive_table_from_table( + hive_tbl: &hive_metastore::Table, + tbl: &Table, +) -> Result { + let mut new_tbl = hive_tbl.clone(); + let metadata = tbl.metadata(); + let schema = metadata.current_schema(); + + let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build(); + + match new_tbl.sd.as_mut() { + Some(sd) => { + sd.cols = Some(hive_schema); + } + None => { + // Highly unlikely for a real HMS table, but be defensive + new_tbl.sd = Some(StorageDescriptor { + cols: Some(hive_schema), + ..Default::default() + }); + } + } + + let metadata_location = tbl.metadata_location_result()?.to_string(); + + let mut params: AHashMap = new_tbl.parameters.take().unwrap_or_default(); + for (k, v) in metadata.properties().iter() { + if k == METADATA_LOCATION || k == TABLE_TYPE || k == EXTERNAL { + continue; + } + params.insert( + FastStr::from_string(k.to_string()), + FastStr::from_string(v.to_string()), + ); + } + + params.insert(FastStr::from(EXTERNAL), FastStr::from("TRUE")); + params.insert(FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")); + params.insert( + FastStr::from(METADATA_LOCATION), + FastStr::from(metadata_location), + ); + + new_tbl.parameters = Some(params); + + Ok(new_tbl) +} + pub(crate) fn convert_to_hive_table( db_name: String, schema: &Schema, @@ -309,6 +358,26 @@ fn get_current_time() -> Result { }) } +pub(crate) fn create_lock_request(db_name: &str, tbl_name: &str) -> hive_metastore::LockRequest { + let component = hive_metastore::LockComponent { + r#type: hive_metastore::LockType::EXCLUSIVE, + level: hive_metastore::LockLevel::TABLE, + dbname: FastStr::from_string(db_name.to_string()), + tablename: Some(FastStr::from_string(tbl_name.to_string())), + partitionname: None, + operation_type: None, + is_acid: Some(true), + is_dynamic_partition_write: None, + }; + hive_metastore::LockRequest { + component: vec![component], + txnid: None, + user: FastStr::from(whoami::username()), + hostname: FastStr::from(whoami::fallible::hostname().unwrap()), + agent_info: None, + } +} + #[cfg(test)] mod tests { use iceberg::spec::{NestedField, PrimitiveType, Type}; diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 9793b7f738..31af0d4b58 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -24,6 +24,7 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, @@ -58,6 +59,17 @@ fn after_all() { } async fn get_catalog() -> HmsCatalog { + get_catalog_with_props(HashMap::new()).await +} + +async fn get_catalog_with_optimistic_locking() -> HmsCatalog { + use iceberg_catalog_hms::HMS_HIVE_LOCKS_DISABLED; + let mut extra_props = HashMap::new(); + extra_props.insert(HMS_HIVE_LOCKS_DISABLED.to_string(), "true".to_string()); + get_catalog_with_props(extra_props).await +} + +async fn get_catalog_with_props(extra_props: HashMap) -> HmsCatalog { set_up(); let (hms_catalog_ip, minio_ip) = { @@ -81,7 +93,7 @@ async fn get_catalog() -> HmsCatalog { sleep(std::time::Duration::from_millis(1000)).await; } - let props = HashMap::from([ + let mut props = HashMap::from([ ( HMS_CATALOG_PROP_URI.to_string(), hms_socket_addr.to_string(), @@ -103,6 +115,9 @@ async fn get_catalog() -> HmsCatalog { (S3_REGION.to_string(), "us-east-1".to_string()), ]); + // Merge in extra properties + props.extend(extra_props); + // Wait for bucket to actually exist let file_io = iceberg::io::FileIO::from_path("s3a://") .unwrap() @@ -404,3 +419,119 @@ async fn test_drop_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_update_table() -> Result<()> { + let catalog = get_catalog().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into())); + set_test_namespace(&catalog, namespace.name()).await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + let original_metadata_location = table.metadata_location(); + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set("test_property".to_string(), "test_value".to_string()) + .apply(tx)?; + + let updated_table = tx.commit(&catalog).await?; + + assert_eq!( + updated_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit" + ); + + let reloaded_table = catalog.load_table(table.identifier()).await?; + + assert_eq!( + reloaded_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_update_table_with_optimistic_locking() -> Result<()> { + let catalog = get_catalog_with_optimistic_locking().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table_optimistic".into())); + set_test_namespace(&catalog, namespace.name()).await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + let original_metadata_location = table.metadata_location(); + + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set( + "test_property_optimistic".to_string(), + "test_value_optimistic".to_string(), + ) + .apply(tx)?; + + let updated_table = tx.commit(&catalog).await?; + + assert_eq!( + updated_table + .metadata() + .properties() + .get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit with optimistic locking" + ); + + let reloaded_table = catalog.load_table(table.identifier()).await?; + assert_eq!( + reloaded_table + .metadata() + .properties() + .get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +}