diff --git a/native/core/src/execution/memory_pools/fair_pool.rs b/native/core/src/execution/memory_pools/fair_pool.rs index 2c25fe9443..0e3937c0c0 100644 --- a/native/core/src/execution/memory_pools/fair_pool.rs +++ b/native/core/src/execution/memory_pools/fair_pool.rs @@ -22,7 +22,10 @@ use std::{ use jni::objects::GlobalRef; -use crate::{errors::CometResult, jvm_bridge::JVMClasses}; +use crate::{ + errors::{CometError, CometResult}, + jvm_bridge::JVMClasses, +}; use datafusion::common::resources_err; use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::{ @@ -74,6 +77,7 @@ impl CometFairMemoryPool { jni_call!(&mut env, comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64) } + .map_err(CometError::drop_throwable) } fn release(&self, size: usize) -> CometResult<()> { @@ -82,6 +86,7 @@ impl CometFairMemoryPool { unsafe { jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ()) } + .map_err(CometError::drop_throwable) } } diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index 3233dd6d40..e8d5147b17 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -23,7 +23,10 @@ use std::{ }, }; -use crate::{errors::CometResult, jvm_bridge::JVMClasses}; +use crate::{ + errors::{CometError, CometResult}, + jvm_bridge::JVMClasses, +}; use datafusion::{ common::{resources_datafusion_err, DataFusionError}, execution::memory_pool::{MemoryPool, MemoryReservation}, @@ -68,6 +71,7 @@ impl CometUnifiedMemoryPool { jni_call!(&mut env, comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64) } + .map_err(CometError::drop_throwable) } /// Release memory to Spark's off-heap memory pool via JNI @@ -77,6 +81,7 @@ impl CometUnifiedMemoryPool { unsafe { jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ()) } + .map_err(CometError::drop_throwable) } } diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 640201f6f0..e367256958 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -171,6 +171,25 @@ pub enum CometError { }, } +impl CometError { + /// Convert a `JavaException` into an `Internal` error, dropping the JNI + /// `GlobalRef` while the current thread is still attached to the JVM. + /// + /// Call this on errors returned from JNI helper methods that may execute on + /// threads not permanently attached to the JVM (e.g. tokio worker threads + /// used by the memory pool). Without this, the `GlobalRef` can outlive the + /// `AttachGuard` and be dropped on a detached thread, which triggers a + /// warning and an expensive temporary attach/detach cycle. + pub fn drop_throwable(self) -> Self { + match self { + CometError::JavaException { class, msg, .. } => { + CometError::Internal(format!("{class}: {msg}")) + } + other => other, + } + } +} + pub fn init() { std::panic::set_hook(Box::new(|panic_info| { // Log the panic message and location to stderr so it is visible in CI logs