diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index 6d8f011d0b..5522762988 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -17,6 +17,7 @@ use log::{debug, error}; use std::collections::HashMap; +use std::sync::OnceLock; use url::Url; use crate::execution::jni_api::get_runtime; @@ -111,13 +112,48 @@ pub fn create_store( Ok((Box::new(object_store), path)) } +/// Process-wide cache of resolved S3 bucket regions, keyed by bucket name. +/// +/// ## Why static / process lifetime? +/// +/// See the equivalent rationale on `object_store_cache` in `parquet_support.rs`: the JNI +/// call site creates a new `RuntimeEnv` per file, leaving the executor process as the only +/// available scope for cross-call state. In the standard Spark-on-Kubernetes deployment +/// model each executor is dedicated to a single application, so process and application +/// lifetimes are equivalent. +/// +/// ## Unbounded size +/// +/// A Spark job accesses a bounded, typically small set of S3 buckets, so the number of +/// entries stays proportional to the number of distinct buckets. Entries are just +/// `(String, String)` pairs and the set does not grow beyond what the job actually touches. +/// +/// ## Invalidation +/// +/// An S3 bucket's region is permanently fixed at creation time and cannot change; no +/// invalidation is therefore needed. This is what makes a static, never-evicting cache +/// safe here and on the equivalent region-resolution path inside the `object_store` crate. +fn region_cache() -> &'static RwLock> { + static CACHE: OnceLock>> = OnceLock::new(); + CACHE.get_or_init(|| RwLock::new(HashMap::new())) +} + /// Get the bucket region using the [HeadBucket API]. This will fail if the bucket does not exist. +/// Results are cached per bucket to avoid redundant network calls. /// /// [HeadBucket API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html /// /// TODO this is copied from the object store crate and has been adapted as a workaround /// for https://github.com/apache/arrow-rs-object-store/issues/479 pub async fn resolve_bucket_region(bucket: &str) -> Result> { + // Check cache first + if let Ok(cache) = region_cache().read() { + if let Some(region) = cache.get(bucket) { + debug!("Using cached region '{region}' for bucket '{bucket}'"); + return Ok(region.clone()); + } + } + let endpoint = format!("https://{bucket}.s3.amazonaws.com"); let client = reqwest::Client::new(); @@ -142,6 +178,12 @@ pub async fn resolve_bucket_region(bucket: &str) -> Result>>; + +/// Process-wide cache of object stores, keyed by `(scheme://host:port, config_hash)`. +/// +/// ## Why static / process lifetime? +/// +/// Comet's JNI architecture calls `initRecordBatchReader` once per Parquet file, and each +/// call constructs a fresh `RuntimeEnv`. There is therefore no executor-scoped Rust object +/// with a lifetime longer than a single file read that could own this cache. The executor +/// process itself is the natural scope for HTTP connection-pool reuse, so process lifetime +/// (i.e. `static`) is the appropriate choice here. In the standard Spark-on-Kubernetes +/// deployment model each executor process is dedicated to a single Spark application, so +/// process lifetime and application lifetime are equivalent; the cache is reclaimed when +/// the executor pod terminates. +/// +/// ## Unbounded size +/// +/// Cache entries are indexed by `(scheme://host:port, hash-of-configs)`. A typical Spark +/// job accesses a small, fixed set of buckets with a stable configuration, so the number of +/// distinct keys is O(buckets × credential-configs) and remains small throughout the job. +/// Entries are cheap relative to the cost of creating a new object store (new HTTP +/// connection pool + DNS resolution), and there is no meaningful benefit from eviction, so +/// no eviction policy is applied. +/// +/// ## Credential invalidation +/// +/// Object stores that use dynamic credentials (IMDS, WebIdentity, ECS role, STS assume-role) +/// delegate credential refresh to a `CometCredentialProvider` that fetches fresh credentials +/// on every request, so credential rotation is transparent and requires no cache +/// invalidation. Object stores whose credentials are embedded in the Hadoop configuration +/// (e.g. `fs.s3a.access.key` / `fs.s3a.secret.key`) produce a different `config_hash` when +/// those values change, which causes a new store to be created and inserted under the new +/// key; the old entry is harmlessly superseded. +fn object_store_cache() -> &'static ObjectStoreCache { + static CACHE: OnceLock = OnceLock::new(); + CACHE.get_or_init(|| RwLock::new(HashMap::new())) +} + +/// Compute a hash of the object store configuration for cache keying. +fn hash_object_store_configs(configs: &HashMap) -> u64 { + let mut hasher = DefaultHasher::new(); + let mut keys: Vec<&String> = configs.keys().collect(); + keys.sort(); + for key in keys { + key.hash(&mut hasher); + configs[key].hash(&mut hasher); + } + hasher.finish() +} + /// Parses the url, registers the object store with configurations, and returns a tuple of the object store url /// and object store path pub(crate) fn prepare_object_store_with_configs( @@ -467,17 +520,45 @@ pub(crate) fn prepare_object_store_with_configs( &url[url::Position::BeforeHost..url::Position::AfterPort], ); - let (object_store, object_store_path): (Box, Path) = if is_hdfs_scheme { - create_hdfs_object_store(&url) - } else if scheme == "s3" { - objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) - } else { - parse_url(&url) - } - .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + let config_hash = hash_object_store_configs(object_store_configs); + let cache_key = (url_key.clone(), config_hash); + + // Check the cache first to reuse existing object store instances. + // This enables HTTP connection pooling and avoids redundant DNS lookups. + let cached = { + let cache = object_store_cache() + .read() + .map_err(|e| ExecutionError::GeneralError(format!("Object store cache error: {e}")))?; + cache.get(&cache_key).cloned() + }; + + let (object_store, object_store_path): (Arc, Path) = + if let Some(store) = cached { + debug!("Reusing cached object store for {url_key}"); + let path = Path::from_url_path(url.path()) + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + (store, path) + } else { + debug!("Creating new object store for {url_key}"); + let (store, path): (Box, Path) = if is_hdfs_scheme { + create_hdfs_object_store(&url) + } else if scheme == "s3" { + objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) + } else { + parse_url(&url) + } + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + + let store: Arc = Arc::from(store); + // Insert into cache + if let Ok(mut cache) = object_store_cache().write() { + cache.insert(cache_key, Arc::clone(&store)); + } + (store, path) + }; let object_store_url = ObjectStoreUrl::parse(url_key.clone())?; - runtime_env.register_object_store(&url, Arc::from(object_store)); + runtime_env.register_object_store(&url, object_store); Ok((object_store_url, object_store_path)) }