From b8fe07250a3a42440e9db1b3c501f32bd0d22b56 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Mar 2026 14:39:00 -0600 Subject: [PATCH 1/5] fix: cache object stores and bucket regions to reduce DNS query volume When reading Parquet files from S3, each call to initRecordBatchReader creates a new SessionContext, RuntimeEnv, and S3 ObjectStore client. Each ObjectStore instance creates a new reqwest HTTP client with its own connection pool, requiring fresh DNS resolution for every file opened. In TPC-DS-scale workloads with thousands of files, this generates excessive DNS queries that can overwhelm resolvers (e.g., Route53 Resolver limits in EKS environments), causing UnknownHostException errors and intermittent S3 connectivity failures. This commit introduces two caches: 1. A global ObjectStore cache keyed by (URL prefix, config hash) in prepare_object_store_with_configs(). Subsequent reads from the same bucket with the same configuration reuse the existing ObjectStore, enabling HTTP connection pooling and eliminating redundant DNS lookups. 2. A region resolution cache in resolve_bucket_region(). When no region is explicitly configured, each file read triggers a HeadBucket API call (with its own reqwest client) to determine the bucket region. The cache ensures this call happens only once per bucket. --- native/core/src/parquet/objectstore/s3.rs | 26 ++++++++ native/core/src/parquet/parquet_support.rs | 74 +++++++++++++++++++--- 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index 6d8f011d0b..e8a0778f83 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -17,6 +17,7 @@ use log::{debug, error}; use std::collections::HashMap; +use std::sync::OnceLock; use url::Url; use crate::execution::jni_api::get_runtime; @@ -111,13 +112,32 @@ pub fn create_store( Ok((Box::new(object_store), path)) } +/// Cache for resolved bucket regions to avoid redundant HeadBucket API calls. +/// +/// Without this cache, every Parquet file read triggers a HeadBucket request to determine +/// the bucket's region (when region is not explicitly configured), each requiring its own +/// DNS lookup. This is a significant contributor to DNS query volume in large workloads. +fn region_cache() -> &'static RwLock> { + static CACHE: OnceLock>> = OnceLock::new(); + CACHE.get_or_init(|| RwLock::new(HashMap::new())) +} + /// Get the bucket region using the [HeadBucket API]. This will fail if the bucket does not exist. +/// Results are cached per bucket to avoid redundant network calls. /// /// [HeadBucket API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html /// /// TODO this is copied from the object store crate and has been adapted as a workaround /// for https://github.com/apache/arrow-rs-object-store/issues/479 pub async fn resolve_bucket_region(bucket: &str) -> Result> { + // Check cache first + if let Ok(cache) = region_cache().read() { + if let Some(region) = cache.get(bucket) { + debug!("Using cached region '{region}' for bucket '{bucket}'"); + return Ok(region.clone()); + } + } + let endpoint = format!("https://{bucket}.s3.amazonaws.com"); let client = reqwest::Client::new(); @@ -142,6 +162,12 @@ pub async fn resolve_bucket_region(bucket: &str) -> Result>>; + +/// Global cache of object stores keyed by (url_key, config_hash). +/// +/// This avoids creating a new S3 client (and thus a new HTTP connection pool and DNS +/// resolution) for every Parquet file read. Without this cache, each call to +/// `initRecordBatchReader` from the JVM creates a fresh `reqwest::Client`, leading to +/// excessive DNS queries that can overwhelm DNS resolvers (e.g., Route53 Resolver limits +/// in EKS environments). +fn object_store_cache() -> &'static ObjectStoreCache { + static CACHE: OnceLock = OnceLock::new(); + CACHE.get_or_init(|| RwLock::new(HashMap::new())) +} + +/// Compute a hash of the object store configuration for cache keying. +fn hash_object_store_configs(configs: &HashMap) -> u64 { + let mut hasher = DefaultHasher::new(); + let mut keys: Vec<&String> = configs.keys().collect(); + keys.sort(); + for key in keys { + key.hash(&mut hasher); + configs[key].hash(&mut hasher); + } + hasher.finish() +} + /// Parses the url, registers the object store with configurations, and returns a tuple of the object store url /// and object store path pub(crate) fn prepare_object_store_with_configs( @@ -467,17 +496,46 @@ pub(crate) fn prepare_object_store_with_configs( &url[url::Position::BeforeHost..url::Position::AfterPort], ); - let (object_store, object_store_path): (Box, Path) = if is_hdfs_scheme { - create_hdfs_object_store(&url) - } else if scheme == "s3" { - objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) + let config_hash = hash_object_store_configs(object_store_configs); + let cache_key = (url_key.clone(), config_hash); + + // Check the cache first to reuse existing object store instances. + // This enables HTTP connection pooling and avoids redundant DNS lookups. + let cached = { + let cache = object_store_cache() + .read() + .map_err(|e| ExecutionError::GeneralError(format!("Object store cache error: {e}")))?; + cache.get(&cache_key).cloned() + }; + + let (object_store, object_store_path): (Arc, Path) = if let Some(store) = + cached + { + debug!("Reusing cached object store for {url_key}"); + let path = Path::parse(url.path()) + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + (store, path) } else { - parse_url(&url) - } - .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + debug!("Creating new object store for {url_key}"); + let (store, path): (Box, Path) = if is_hdfs_scheme { + create_hdfs_object_store(&url) + } else if scheme == "s3" { + objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) + } else { + parse_url(&url) + } + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + + let store: Arc = Arc::from(store); + // Insert into cache + if let Ok(mut cache) = object_store_cache().write() { + cache.insert(cache_key, Arc::clone(&store)); + } + (store, path) + }; let object_store_url = ObjectStoreUrl::parse(url_key.clone())?; - runtime_env.register_object_store(&url, Arc::from(object_store)); + runtime_env.register_object_store(&url, object_store); Ok((object_store_url, object_store_path)) } From 3fe3c01424b51240d651eb8f8922d9497984b73c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Mar 2026 15:41:42 -0600 Subject: [PATCH 2/5] fmt --- native/core/src/parquet/parquet_support.rs | 47 +++++++++++----------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index c2c684820e..83dd18dac6 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -41,8 +41,8 @@ use object_store::{parse_url, ObjectStore}; use std::collections::HashMap; use std::sync::OnceLock; use std::time::Duration; -use std::{fmt::Debug, hash::Hash, sync::Arc}; use std::{collections::hash_map::DefaultHasher, hash::Hasher, sync::RwLock}; +use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; use super::objectstore; @@ -508,31 +508,30 @@ pub(crate) fn prepare_object_store_with_configs( cache.get(&cache_key).cloned() }; - let (object_store, object_store_path): (Arc, Path) = if let Some(store) = - cached - { - debug!("Reusing cached object store for {url_key}"); - let path = Path::parse(url.path()) - .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; - (store, path) - } else { - debug!("Creating new object store for {url_key}"); - let (store, path): (Box, Path) = if is_hdfs_scheme { - create_hdfs_object_store(&url) - } else if scheme == "s3" { - objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) + let (object_store, object_store_path): (Arc, Path) = + if let Some(store) = cached { + debug!("Reusing cached object store for {url_key}"); + let path = + Path::parse(url.path()).map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + (store, path) } else { - parse_url(&url) - } - .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + debug!("Creating new object store for {url_key}"); + let (store, path): (Box, Path) = if is_hdfs_scheme { + create_hdfs_object_store(&url) + } else if scheme == "s3" { + objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) + } else { + parse_url(&url) + } + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; - let store: Arc = Arc::from(store); - // Insert into cache - if let Ok(mut cache) = object_store_cache().write() { - cache.insert(cache_key, Arc::clone(&store)); - } - (store, path) - }; + let store: Arc = Arc::from(store); + // Insert into cache + if let Ok(mut cache) = object_store_cache().write() { + cache.insert(cache_key, Arc::clone(&store)); + } + (store, path) + }; let object_store_url = ObjectStoreUrl::parse(url_key.clone())?; runtime_env.register_object_store(&url, object_store); From 37c3c81af5301ac9ab51a06f2990594937716b47 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Mar 2026 15:46:55 -0600 Subject: [PATCH 3/5] fix: use from_url_path for cached object store path extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Path::parse re-encodes percent-encoded characters, causing double-encoding (e.g. %27 → %2527) when reusing cached object stores. Use Path::from_url_path which correctly handles already-encoded URL paths, consistent with other call sites in the codebase. --- native/core/src/parquet/parquet_support.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 83dd18dac6..64bde9bcf0 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -511,8 +511,8 @@ pub(crate) fn prepare_object_store_with_configs( let (object_store, object_store_path): (Arc, Path) = if let Some(store) = cached { debug!("Reusing cached object store for {url_key}"); - let path = - Path::parse(url.path()).map_err(|e| ExecutionError::GeneralError(e.to_string()))?; + let path = Path::from_url_path(url.path()) + .map_err(|e| ExecutionError::GeneralError(e.to_string()))?; (store, path) } else { debug!("Creating new object store for {url_key}"); From 4f1cc4e8e0963d5c7d6e72f2e99f0c03e3153247 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Mar 2026 08:16:13 -0600 Subject: [PATCH 4/5] docs: document why static lifetime is appropriate for object store caches Address review comment asking for justification of global singleton caches. Add comments explaining: - Why process lifetime is the right scope (JNI creates a new RuntimeEnv per file, leaving no executor-scoped Rust object to own the cache) - Why unbounded size is acceptable (bounded by distinct bucket+config combinations, which is small in practice) - Credential invalidation behaviour (dynamic providers refresh transparently; static-credential config changes produce a new hash and a new cache entry) - Why region cache needs no invalidation (S3 bucket regions are immutable after bucket creation) --- native/core/src/parquet/objectstore/s3.rs | 22 ++++++++++++--- native/core/src/parquet/parquet_support.rs | 33 ++++++++++++++++++---- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index e8a0778f83..2c58ed1a00 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -112,11 +112,25 @@ pub fn create_store( Ok((Box::new(object_store), path)) } -/// Cache for resolved bucket regions to avoid redundant HeadBucket API calls. +/// Process-wide cache of resolved S3 bucket regions, keyed by bucket name. /// -/// Without this cache, every Parquet file read triggers a HeadBucket request to determine -/// the bucket's region (when region is not explicitly configured), each requiring its own -/// DNS lookup. This is a significant contributor to DNS query volume in large workloads. +/// ## Why static / process lifetime? +/// +/// See the equivalent rationale on `object_store_cache` in `parquet_support.rs`: the JNI +/// call site creates a new `RuntimeEnv` per file, leaving the executor process as the only +/// available scope for cross-call state. +/// +/// ## Unbounded size +/// +/// A Spark job accesses a bounded, typically small set of S3 buckets, so the number of +/// entries stays proportional to the number of distinct buckets. Entries are just +/// `(String, String)` pairs and the set does not grow beyond what the job actually touches. +/// +/// ## Invalidation +/// +/// An S3 bucket's region is permanently fixed at creation time and cannot change; no +/// invalidation is therefore needed. This is what makes a static, never-evicting cache +/// safe here and on the equivalent region-resolution path inside the `object_store` crate. fn region_cache() -> &'static RwLock> { static CACHE: OnceLock>> = OnceLock::new(); CACHE.get_or_init(|| RwLock::new(HashMap::new())) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 64bde9bcf0..71167ca6d1 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -449,13 +449,34 @@ fn create_hdfs_object_store( type ObjectStoreCache = RwLock>>; -/// Global cache of object stores keyed by (url_key, config_hash). +/// Process-wide cache of object stores, keyed by `(scheme://host:port, config_hash)`. /// -/// This avoids creating a new S3 client (and thus a new HTTP connection pool and DNS -/// resolution) for every Parquet file read. Without this cache, each call to -/// `initRecordBatchReader` from the JVM creates a fresh `reqwest::Client`, leading to -/// excessive DNS queries that can overwhelm DNS resolvers (e.g., Route53 Resolver limits -/// in EKS environments). +/// ## Why static / process lifetime? +/// +/// Comet's JNI architecture calls `initRecordBatchReader` once per Parquet file, and each +/// call constructs a fresh `RuntimeEnv`. There is therefore no executor-scoped Rust object +/// with a lifetime longer than a single file read that could own this cache. The executor +/// process itself is the natural scope for HTTP connection-pool reuse, so process lifetime +/// (i.e. `static`) is the appropriate choice here. +/// +/// ## Unbounded size +/// +/// Cache entries are indexed by `(scheme://host:port, hash-of-configs)`. A typical Spark +/// job accesses a small, fixed set of buckets with a stable configuration, so the number of +/// distinct keys is O(buckets × credential-configs) and remains small throughout the job. +/// Entries are cheap relative to the cost of creating a new object store (new HTTP +/// connection pool + DNS resolution), and there is no meaningful benefit from eviction, so +/// no eviction policy is applied. +/// +/// ## Credential invalidation +/// +/// Object stores that use dynamic credentials (IMDS, WebIdentity, ECS role, STS assume-role) +/// delegate credential refresh to a `CometCredentialProvider` that fetches fresh credentials +/// on every request, so credential rotation is transparent and requires no cache +/// invalidation. Object stores whose credentials are embedded in the Hadoop configuration +/// (e.g. `fs.s3a.access.key` / `fs.s3a.secret.key`) produce a different `config_hash` when +/// those values change, which causes a new store to be created and inserted under the new +/// key; the old entry is harmlessly superseded. fn object_store_cache() -> &'static ObjectStoreCache { static CACHE: OnceLock = OnceLock::new(); CACHE.get_or_init(|| RwLock::new(HashMap::new())) From 121bd5d62b684349facdfda449cf56380e3162ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Mar 2026 08:35:00 -0600 Subject: [PATCH 5/5] docs: clarify that process lifetime == application lifetime in K8s In the standard Spark-on-Kubernetes deployment model each executor pod is dedicated to a single Spark application, so the static cache lifetime is effectively bounded by the application lifetime. --- native/core/src/parquet/objectstore/s3.rs | 4 +++- native/core/src/parquet/parquet_support.rs | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index 2c58ed1a00..5522762988 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -118,7 +118,9 @@ pub fn create_store( /// /// See the equivalent rationale on `object_store_cache` in `parquet_support.rs`: the JNI /// call site creates a new `RuntimeEnv` per file, leaving the executor process as the only -/// available scope for cross-call state. +/// available scope for cross-call state. In the standard Spark-on-Kubernetes deployment +/// model each executor is dedicated to a single application, so process and application +/// lifetimes are equivalent. /// /// ## Unbounded size /// diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 71167ca6d1..3418a17c43 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -457,7 +457,10 @@ type ObjectStoreCache = RwLock>>; /// call constructs a fresh `RuntimeEnv`. There is therefore no executor-scoped Rust object /// with a lifetime longer than a single file read that could own this cache. The executor /// process itself is the natural scope for HTTP connection-pool reuse, so process lifetime -/// (i.e. `static`) is the appropriate choice here. +/// (i.e. `static`) is the appropriate choice here. In the standard Spark-on-Kubernetes +/// deployment model each executor process is dedicated to a single Spark application, so +/// process lifetime and application lifetime are equivalent; the cache is reclaimed when +/// the executor pod terminates. /// /// ## Unbounded size ///