Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use std::{

use jni::objects::GlobalRef;

use crate::{errors::CometResult, jvm_bridge::JVMClasses};
use crate::{
errors::{CometError, CometResult},
jvm_bridge::JVMClasses,
};
use datafusion::common::resources_err;
use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::{
Expand Down Expand Up @@ -74,6 +77,7 @@ impl CometFairMemoryPool {
jni_call!(&mut env,
comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)
}
.map_err(CometError::drop_throwable)
}

fn release(&self, size: usize) -> CometResult<()> {
Expand All @@ -82,6 +86,7 @@ impl CometFairMemoryPool {
unsafe {
jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ())
}
.map_err(CometError::drop_throwable)
}
}

Expand Down
7 changes: 6 additions & 1 deletion native/core/src/execution/memory_pools/unified_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use std::{
},
};

use crate::{errors::CometResult, jvm_bridge::JVMClasses};
use crate::{
errors::{CometError, CometResult},
jvm_bridge::JVMClasses,
};
use datafusion::{
common::{resources_datafusion_err, DataFusionError},
execution::memory_pool::{MemoryPool, MemoryReservation},
Expand Down Expand Up @@ -68,6 +71,7 @@ impl CometUnifiedMemoryPool {
jni_call!(&mut env,
comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)
}
.map_err(CometError::drop_throwable)
}

/// Release memory to Spark's off-heap memory pool via JNI
Expand All @@ -77,6 +81,7 @@ impl CometUnifiedMemoryPool {
unsafe {
jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ())
}
.map_err(CometError::drop_throwable)
}
}

Expand Down
19 changes: 19 additions & 0 deletions native/jni-bridge/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,25 @@ pub enum CometError {
},
}

impl CometError {
/// Convert a `JavaException` into an `Internal` error, dropping the JNI
/// `GlobalRef` while the current thread is still attached to the JVM.
///
/// Call this on errors returned from JNI helper methods that may execute on
/// threads not permanently attached to the JVM (e.g. tokio worker threads
/// used by the memory pool). Without this, the `GlobalRef` can outlive the
/// `AttachGuard` and be dropped on a detached thread, which triggers a
/// warning and an expensive temporary attach/detach cycle.
pub fn drop_throwable(self) -> Self {
match self {
CometError::JavaException { class, msg, .. } => {
CometError::Internal(format!("{class}: {msg}"))
}
other => other,
}
}
}

pub fn init() {
std::panic::set_hook(Box::new(|panic_info| {
// Log the panic message and location to stderr so it is visible in CI logs
Expand Down
Loading