Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions native/core/src/parquet/objectstore/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use log::{debug, error};
use std::collections::HashMap;
use std::sync::OnceLock;
use url::Url;

use crate::execution::jni_api::get_runtime;
Expand Down Expand Up @@ -111,13 +112,48 @@ pub fn create_store(
Ok((Box::new(object_store), path))
}

/// Process-wide cache of resolved S3 bucket regions, keyed by bucket name.
///
/// ## Why static / process lifetime?
///
/// See the equivalent rationale on `object_store_cache` in `parquet_support.rs`: the JNI
/// call site creates a new `RuntimeEnv` per file, leaving the executor process as the only
/// available scope for cross-call state. In the standard Spark-on-Kubernetes deployment
/// model each executor is dedicated to a single application, so process and application
/// lifetimes are equivalent.
///
/// ## Unbounded size
///
/// A Spark job accesses a bounded, typically small set of S3 buckets, so the number of
/// entries stays proportional to the number of distinct buckets. Entries are just
/// `(String, String)` pairs and the set does not grow beyond what the job actually touches.
///
/// ## Invalidation
///
/// An S3 bucket's region is permanently fixed at creation time and cannot change; no
/// invalidation is therefore needed. This is what makes a static, never-evicting cache
/// safe here and on the equivalent region-resolution path inside the `object_store` crate.
fn region_cache() -> &'static RwLock<HashMap<String, String>> {
static CACHE: OnceLock<RwLock<HashMap<String, String>>> = OnceLock::new();
CACHE.get_or_init(|| RwLock::new(HashMap::new()))
}

/// Get the bucket region using the [HeadBucket API]. This will fail if the bucket does not exist.
/// Results are cached per bucket to avoid redundant network calls.
///
/// [HeadBucket API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
///
/// TODO this is copied from the object store crate and has been adapted as a workaround
/// for https://github.com/apache/arrow-rs-object-store/issues/479
pub async fn resolve_bucket_region(bucket: &str) -> Result<String, Box<dyn Error>> {
// Check cache first
if let Ok(cache) = region_cache().read() {
if let Some(region) = cache.get(bucket) {
debug!("Using cached region '{region}' for bucket '{bucket}'");
return Ok(region.clone());
}
}

let endpoint = format!("https://{bucket}.s3.amazonaws.com");
let client = reqwest::Client::new();

Expand All @@ -142,6 +178,12 @@ pub async fn resolve_bucket_region(bucket: &str) -> Result<String, Box<dyn Error
.to_str()?
.to_string();

// Cache the resolved region
if let Ok(mut cache) = region_cache().write() {
debug!("Caching region '{region}' for bucket '{bucket}'");
cache.insert(bucket.to_string(), region.clone());
}

Ok(region)
}

Expand Down
99 changes: 90 additions & 9 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::ColumnarValue;
use datafusion_comet_spark_expr::EvalMode;
use log::debug;
use object_store::path::Path;
use object_store::{parse_url, ObjectStore};
use std::collections::HashMap;
use std::sync::OnceLock;
use std::time::Duration;
use std::{collections::hash_map::DefaultHasher, hash::Hasher, sync::RwLock};
use std::{fmt::Debug, hash::Hash, sync::Arc};
use url::Url;

Expand Down Expand Up @@ -444,6 +447,56 @@ fn create_hdfs_object_store(
})
}

type ObjectStoreCache = RwLock<HashMap<(String, u64), Arc<dyn ObjectStore>>>;

/// Process-wide cache of object stores, keyed by `(scheme://host:port, config_hash)`.
///
/// ## Why static / process lifetime?
///
/// Comet's JNI architecture calls `initRecordBatchReader` once per Parquet file, and each
/// call constructs a fresh `RuntimeEnv`. There is therefore no executor-scoped Rust object
/// with a lifetime longer than a single file read that could own this cache. The executor
/// process itself is the natural scope for HTTP connection-pool reuse, so process lifetime
/// (i.e. `static`) is the appropriate choice here. In the standard Spark-on-Kubernetes
/// deployment model each executor process is dedicated to a single Spark application, so
/// process lifetime and application lifetime are equivalent; the cache is reclaimed when
/// the executor pod terminates.
///
/// ## Unbounded size
///
/// Cache entries are indexed by `(scheme://host:port, hash-of-configs)`. A typical Spark
/// job accesses a small, fixed set of buckets with a stable configuration, so the number of
/// distinct keys is O(buckets × credential-configs) and remains small throughout the job.
/// Entries are cheap relative to the cost of creating a new object store (new HTTP
/// connection pool + DNS resolution), and there is no meaningful benefit from eviction, so
/// no eviction policy is applied.
///
/// ## Credential invalidation
///
/// Object stores that use dynamic credentials (IMDS, WebIdentity, ECS role, STS assume-role)
/// delegate credential refresh to a `CometCredentialProvider` that fetches fresh credentials
/// on every request, so credential rotation is transparent and requires no cache
/// invalidation. Object stores whose credentials are embedded in the Hadoop configuration
/// (e.g. `fs.s3a.access.key` / `fs.s3a.secret.key`) produce a different `config_hash` when
/// those values change, which causes a new store to be created and inserted under the new
/// key; the old entry is harmlessly superseded.
fn object_store_cache() -> &'static ObjectStoreCache {
static CACHE: OnceLock<ObjectStoreCache> = OnceLock::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything static (global singleton) should be documented why static is the reasonable life cycle. In particular, I wonder about the unbounded size of a static cache, invalidation scenarios (what if a job runs long enough and needs new credentials passed into the object_store?), and why there was no other location with a reasonable life cycle to own this cache.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs, but I have not yet confirmed that the part about credential provider interaction is actually correct, so moved to draft for now.

CACHE.get_or_init(|| RwLock::new(HashMap::new()))
}

/// Compute a hash of the object store configuration for cache keying.
fn hash_object_store_configs(configs: &HashMap<String, String>) -> u64 {
let mut hasher = DefaultHasher::new();
let mut keys: Vec<&String> = configs.keys().collect();
keys.sort();
for key in keys {
key.hash(&mut hasher);
configs[key].hash(&mut hasher);
}
hasher.finish()
}

/// Parses the url, registers the object store with configurations, and returns a tuple of the object store url
/// and object store path
pub(crate) fn prepare_object_store_with_configs(
Expand All @@ -467,17 +520,45 @@ pub(crate) fn prepare_object_store_with_configs(
&url[url::Position::BeforeHost..url::Position::AfterPort],
);

let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
create_hdfs_object_store(&url)
} else if scheme == "s3" {
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
} else {
parse_url(&url)
}
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
let config_hash = hash_object_store_configs(object_store_configs);
let cache_key = (url_key.clone(), config_hash);

// Check the cache first to reuse existing object store instances.
// This enables HTTP connection pooling and avoids redundant DNS lookups.
let cached = {
let cache = object_store_cache()
.read()
.map_err(|e| ExecutionError::GeneralError(format!("Object store cache error: {e}")))?;
cache.get(&cache_key).cloned()
};

let (object_store, object_store_path): (Arc<dyn ObjectStore>, Path) =
if let Some(store) = cached {
debug!("Reusing cached object store for {url_key}");
let path = Path::from_url_path(url.path())
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
(store, path)
} else {
debug!("Creating new object store for {url_key}");
let (store, path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
create_hdfs_object_store(&url)
} else if scheme == "s3" {
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
} else {
parse_url(&url)
}
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;

let store: Arc<dyn ObjectStore> = Arc::from(store);
// Insert into cache
if let Ok(mut cache) = object_store_cache().write() {
cache.insert(cache_key, Arc::clone(&store));
}
(store, path)
};

let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
runtime_env.register_object_store(&url, Arc::from(object_store));
runtime_env.register_object_store(&url, object_store);
Ok((object_store_url, object_store_path))
}

Expand Down
Loading