From 40e5a51a9fe5f12424ef7f65e48dd0d715b0c831 Mon Sep 17 00:00:00 2001 From: moryanwang <946951911@qq.com> Date: Tue, 9 Jun 2026 16:43:19 +0800 Subject: [PATCH] feat(tencent): support cosn scheme and generalize dynamic object store credentials - Add cosn:// URI scheme support (commonly used in Hadoop/Spark ecosystem) - Generalize dynamic credential provider for COS to support automatic credential refresh - Refactor TencentStoreProvider with base_cos_options/normalize_cos_config/build_cos_store helpers - Support DynamicOpenDalStore for COS when external provider is available - Add comprehensive unit tests for config normalization and dynamic credentials Closes #7173 --- rust/lance-io/src/object_store.rs | 2 +- rust/lance-io/src/object_store/providers.rs | 2 + .../src/object_store/providers/tencent.rs | 328 +++++++++++++++--- 3 files changed, 291 insertions(+), 41 deletions(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 0c44095f117..8bca6d832df 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -36,7 +36,7 @@ use super::local::LocalObjectReader; use crate::uring::{UringCurrentThreadReader, UringReader}; #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] pub(crate) mod dynamic_credentials; -#[cfg(any(feature = "oss", feature = "huggingface", feature = "tos"))] +#[cfg(any(feature = "oss", feature = "huggingface", feature = "tos", feature = "tencent"))] pub(crate) mod dynamic_opendal; mod list_retry; pub mod providers; diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 45ac30a757a..859ff9cd994 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -335,6 +335,8 @@ impl Default for ObjectStoreRegistry { providers.insert("oss".into(), Arc::new(oss::OssStoreProvider)); #[cfg(feature = "tencent")] providers.insert("cos".into(), Arc::new(tencent::TencentStoreProvider)); + #[cfg(feature = "tencent")] + providers.insert("cosn".into(), Arc::new(tencent::TencentStoreProvider)); #[cfg(feature = "huggingface")] providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider)); #[cfg(feature = "tos")] diff --git a/rust/lance-io/src/object_store/providers/tencent.rs b/rust/lance-io/src/object_store/providers/tencent.rs index 5fa885ea5a9..3305742ec3b 100644 --- a/rust/lance-io/src/object_store/providers/tencent.rs +++ b/rust/lance-io/src/object_store/providers/tencent.rs @@ -4,10 +4,12 @@ use std::collections::HashMap; use std::sync::Arc; +use object_store::ObjectStore as OSObjectStore; use object_store_opendal::OpendalStore; use opendal::{Operator, services::Cos}; use url::Url; +use crate::object_store::dynamic_opendal::DynamicOpenDalStore; use crate::object_store::{ DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, @@ -17,73 +19,128 @@ use lance_core::error::{Error, Result}; #[derive(Default, Debug)] pub struct TencentStoreProvider; -#[async_trait::async_trait] -impl ObjectStoreProvider for TencentStoreProvider { - async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result { - let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE); - let storage_options = StorageOptions(params.storage_options().cloned().unwrap_or_default()); - +impl TencentStoreProvider { + fn base_cos_options( + base_path: &Url, + storage_options: &StorageOptions, + ) -> Result> { let bucket = base_path .host_str() - .ok_or_else(|| Error::invalid_input("Tencent Cos URL must contain bucket name"))? + .ok_or_else(|| Error::invalid_input("COS URL must contain bucket name"))? .to_string(); let prefix = base_path.path().trim_start_matches('/').to_string(); - // Start with environment variables as base configuration + // Load COS/TENCENTCLOUD related config from environment variables as base let mut config_map: HashMap = std::env::vars() - .filter(|(k, _)| k.starts_with("COS_") || k.starts_with("TENCENTCLOUD_")) - .map(|(k, v)| { - // Convert env var names to opendal config keys - let key = k + .filter(|(key, _)| key.starts_with("COS_") || key.starts_with("TENCENTCLOUD_")) + .map(|(key, value)| { + let normalized_key = key .to_lowercase() .replace("cos_", "") .replace("tencentcloud_", ""); - (key, v) + (normalized_key, value) }) .collect(); - config_map.insert("bucket".to_string(), bucket); + // Merge storage_options (user-provided options take priority over env vars) + config_map.extend(storage_options.0.clone()); - if !prefix.is_empty() { + config_map.insert("bucket".to_string(), bucket); + if prefix.is_empty() { + config_map.remove("root"); + } else { config_map.insert("root".to_string(), "/".to_string()); } - // Override with storage options if provided - if let Some(endpoint) = storage_options.0.get("cos_endpoint") { - config_map.insert("endpoint".to_string(), endpoint.clone()); - } + Ok(config_map) + } - if let Some(secret_id) = storage_options.0.get("cos_secret_id") { - config_map.insert("secret_id".to_string(), secret_id.clone()); - } + /// Normalize COS config options by mapping alias keys to the standard keys + /// required by the OpenDAL COS service, with support for temporary credentials (security_token). + fn normalize_cos_config(options: &HashMap) -> Result> { + let mut config_map = options.clone(); - if let Some(secret_key) = storage_options.0.get("cos_secret_key") { - config_map.insert("secret_key".to_string(), secret_key.clone()); - } + // Alias mapping: map user-friendly keys to OpenDAL COS standard keys + let alias_groups: &[(&str, &[&str])] = &[ + ("endpoint", &["cos_endpoint"]), + ("secret_id", &["cos_secret_id"]), + ("secret_key", &["cos_secret_key"]), + ("security_token", &["cos_security_token"]), + ("bucket", &["cos_bucket"]), + ("region", &["cos_region"]), + ]; - if let Some(enable_versioning) = storage_options.0.get("cos_enable_versioning") { - config_map.insert("enable_versioning".to_string(), enable_versioning.clone()); + for (canonical, aliases) in alias_groups { + for alias in *aliases { + if let Some(value) = config_map.remove(*alias) { + config_map.insert(canonical.to_string(), value); + break; + } + } } - - // Currently, the configuration options for CosConfig in OpenDAL are very limited. - // Most configurations need to be entered via environment variables, such as TENCENTCLOUD_SECURITY_TOKEN, TENCENTCLOUD_REGION, etc. - // (more env config details: https://github.com/apache/opendal-reqsign/blob/v0.16.5/src/tencent/config.rs) - // Therefore, we need to keep `disable_config_load` always false to allow configurations to be loaded from environment variables. - // TODO: improve CosConfig in opendal and add more storage_option here - config_map.insert("disable_config_load".to_string(), "false".to_string()); + + config_map + .entry("disable_config_load".to_string()) + .or_insert_with(|| "false".to_string()); if !config_map.contains_key("endpoint") { - return Err(Error::invalid_input( - "COS endpoint is required. Please provide 'cos_endpoint' in storage options or set COS_ENDPOINT environment variable", - )); + if let Some(region) = config_map + .get("region") + .filter(|v| !v.is_empty()) + .cloned() + { + let endpoint = format!("https://cos.{}.myqcloud.com", region); + config_map.insert("endpoint".to_string(), endpoint); + } else { + return Err(Error::invalid_input( + "COS endpoint or region is required. Please provide 'cos_endpoint' or 'cos_region' in storage options, or set COS_ENDPOINT / TENCENTCLOUD_REGION environment variable", + )); + } } + Ok(config_map) + } + + /// Build a static OpendalStore from the normalized config + fn build_cos_store(config_map: HashMap) -> Result { let operator = Operator::from_iter::(config_map) .map_err(|e| Error::invalid_input(format!("Failed to create COS operator: {:?}", e)))? .finish(); - let opendal_store = Arc::new(OpendalStore::new(operator)); + Ok(OpendalStore::new(operator)) + } +} + +#[async_trait::async_trait] +impl ObjectStoreProvider for TencentStoreProvider { + async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result { + let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE); + let storage_options = StorageOptions(params.storage_options().cloned().unwrap_or_default()); + + let base_options = Self::base_cos_options(&base_path, &storage_options)?; + let accessor = params.get_accessor(); + + // Two priority levels for store creation: + // (1) First priority: check for external provider + // (2) Second priority: user explicitly provided credentials (including temporary credentials), create a static store + let inner: Arc = + if let Some(accessor) = accessor.filter(|a| a.has_provider()) { + Arc::new( + DynamicOpenDalStore::new( + format!("cos:{}", base_path), + base_options, + accessor, + Self::normalize_cos_config, + Self::build_cos_store, + ) + .with_protected_keys(["bucket", "root"]), + ) + } else { + Arc::new(Self::build_cos_store(Self::normalize_cos_config( + &base_options, + )?)?) + }; let mut url = base_path; if !url.path().ends_with('/') { @@ -92,7 +149,7 @@ impl ObjectStoreProvider for TencentStoreProvider { Ok(ObjectStore { scheme: "cos".to_string(), - inner: opendal_store, + inner, block_size, max_iop_size: *DEFAULT_MAX_IOP_SIZE, use_constant_size_upload_parts: params.use_constant_size_upload_parts, @@ -107,8 +164,13 @@ impl ObjectStoreProvider for TencentStoreProvider { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use super::TencentStoreProvider; - use crate::object_store::ObjectStoreProvider; + use crate::object_store::dynamic_opendal::DynamicOpenDalStore; + use crate::object_store::test_utils::StaticMockStorageOptionsProvider; + use crate::object_store::{ObjectStoreProvider, StorageOptionsAccessor}; use url::Url; #[test] @@ -120,4 +182,190 @@ mod tests { let expected_path = object_store::path::Path::from("path/to/file"); assert_eq!(path, expected_path); } + + #[test] + fn test_cosn_store_path() { + let provider = TencentStoreProvider; + + let url = Url::parse("cosn://bucket/path/to/file").unwrap(); + let path = provider.extract_path(&url).unwrap(); + let expected_path = object_store::path::Path::from("path/to/file"); + assert_eq!(path, expected_path); + } + + #[test] + fn test_cos_alias_options_override_canonical_env_options() { + let config = TencentStoreProvider::normalize_cos_config(&HashMap::from([ + ( + "endpoint".to_string(), + "https://cos.ap-guangzhou.myqcloud.com".to_string(), + ), + ( + "cos_endpoint".to_string(), + "https://cos.ap-shanghai.myqcloud.com".to_string(), + ), + ("secret_id".to_string(), "env-secret-id".to_string()), + ("cos_secret_id".to_string(), "user-secret-id".to_string()), + ("secret_key".to_string(), "env-secret-key".to_string()), + ("cos_secret_key".to_string(), "user-secret-key".to_string()), + ("security_token".to_string(), "env-token".to_string()), + ("cos_security_token".to_string(), "user-token".to_string()), + ("bucket".to_string(), "bucket".to_string()), + ])) + .unwrap(); + + assert_eq!( + config.get("endpoint").unwrap(), + "https://cos.ap-shanghai.myqcloud.com" + ); + assert_eq!(config.get("secret_id").unwrap(), "user-secret-id"); + assert_eq!(config.get("secret_key").unwrap(), "user-secret-key"); + assert_eq!(config.get("security_token").unwrap(), "user-token"); + assert!(!config.contains_key("cos_endpoint")); + assert!(!config.contains_key("cos_secret_id")); + assert!(!config.contains_key("cos_secret_key")); + assert!(!config.contains_key("cos_security_token")); + } + + #[test] + fn test_cos_url_bucket_and_root_are_authoritative() { + let storage_options = crate::object_store::StorageOptions(HashMap::from([ + ( + "cos_endpoint".to_string(), + "https://cos.ap-guangzhou.myqcloud.com".to_string(), + ), + ("bucket".to_string(), "storage-options-bucket".to_string()), + ("root".to_string(), "/storage-options-root".to_string()), + ])); + let base_options = TencentStoreProvider::base_cos_options( + &Url::parse("cos://url-bucket/path").unwrap(), + &storage_options, + ) + .unwrap(); + let config = TencentStoreProvider::normalize_cos_config(&base_options).unwrap(); + + // Bucket from URL has the highest priority + assert_eq!(config.get("bucket").unwrap(), "url-bucket"); + assert_eq!(config.get("root").unwrap(), "/"); + } + + #[test] + fn test_cos_empty_url_path_removes_storage_option_root() { + let storage_options = crate::object_store::StorageOptions(HashMap::from([ + ( + "cos_endpoint".to_string(), + "https://cos.ap-guangzhou.myqcloud.com".to_string(), + ), + ("root".to_string(), "/storage-options-root".to_string()), + ])); + let base_options = TencentStoreProvider::base_cos_options( + &Url::parse("cos://url-bucket").unwrap(), + &storage_options, + ) + .unwrap(); + let config = TencentStoreProvider::normalize_cos_config(&base_options).unwrap(); + + assert_eq!(config.get("bucket").unwrap(), "url-bucket"); + assert!(!config.contains_key("root")); + } + + #[test] + fn test_cos_region_generates_endpoint_automatically() { + // When only region is provided (no endpoint), endpoint should be auto-generated + let config = TencentStoreProvider::normalize_cos_config(&HashMap::from([ + ("region".to_string(), "ap-guangzhou".to_string()), + ("bucket".to_string(), "test-bucket".to_string()), + ])) + .unwrap(); + + assert_eq!( + config.get("endpoint").unwrap(), + "https://cos.ap-guangzhou.myqcloud.com" + ); + } + + #[test] + fn test_cos_region_alias_generates_endpoint() { + // cos_region alias should also work + let config = TencentStoreProvider::normalize_cos_config(&HashMap::from([ + ("cos_region".to_string(), "ap-shanghai".to_string()), + ("bucket".to_string(), "test-bucket".to_string()), + ])) + .unwrap(); + + assert_eq!( + config.get("endpoint").unwrap(), + "https://cos.ap-shanghai.myqcloud.com" + ); + } + + #[test] + fn test_cos_endpoint_takes_priority_over_region() { + // When both endpoint and region are provided, endpoint should take priority + let config = TencentStoreProvider::normalize_cos_config(&HashMap::from([ + ( + "endpoint".to_string(), + "https://custom-endpoint.example.com".to_string(), + ), + ("region".to_string(), "ap-guangzhou".to_string()), + ("bucket".to_string(), "test-bucket".to_string()), + ])) + .unwrap(); + + assert_eq!( + config.get("endpoint").unwrap(), + "https://custom-endpoint.example.com" + ); + } + + #[test] + fn test_cos_no_endpoint_no_region_returns_error() { + // When neither endpoint nor region is provided, should return error + let result = TencentStoreProvider::normalize_cos_config(&HashMap::from([( + "bucket".to_string(), + "test-bucket".to_string(), + )])); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("endpoint or region is required")); + } + + #[tokio::test] + async fn test_dynamic_opendal_cos_store_uses_provider_credentials() { + let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new( + StaticMockStorageOptionsProvider { + options: HashMap::from([ + ( + "cos_endpoint".to_string(), + "https://cos.ap-guangzhou.myqcloud.com".to_string(), + ), + ("cos_secret_id".to_string(), "akid".to_string()), + ("cos_secret_key".to_string(), "secret".to_string()), + ("cos_security_token".to_string(), "token".to_string()), + ]), + }, + ))); + + let base_options = TencentStoreProvider::base_cos_options( + &Url::parse("cos://bucket/path").unwrap(), + &crate::object_store::StorageOptions(HashMap::new()), + ) + .unwrap(); + + let store = DynamicOpenDalStore::new( + "cos", + base_options, + accessor, + TencentStoreProvider::normalize_cos_config, + TencentStoreProvider::build_cos_store, + ); + + let current_store = store + .current_store() + .await + .expect("dynamic OpenDAL COS store should build"); + + assert!(current_store.to_string().contains("Opendal")); + } }