From e25a87c2841ed41cc7dd1bf56ea1b27d21850743 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 28 Mar 2026 11:26:29 -0600 Subject: [PATCH] fix: wait for spawned tokio task before releasing native plan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a tokio task is spawned for async execution (no JVM data sources), the stream and its MemoryReservations are owned by the tokio task. If releasePlan drops the ExecutionContext without waiting for the task to complete, the reservations are released asynchronously on the tokio thread — potentially after Spark has already called cleanUpAllAllocatedMemory(), causing "release called on X bytes but task only has 0 bytes" warnings. Store the JoinHandle from the spawned task and block on it in releasePlan after dropping the batch receiver (to signal the task to exit). This ensures all memory is released back to Spark before releasePlan returns to the JVM. Also fixes Source 2 of #2470 — GlobalRefs held by the stream are now dropped while the JVM thread is still the caller, avoiding detached thread warnings. Closes #2453 --- native/core/src/execution/jni_api.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e0a395ebbf..5d9bd0a2ce 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -159,6 +159,8 @@ struct ExecutionContext { pub stream: Option, /// Receives batches from a spawned tokio task (async I/O path) pub batch_receiver: Option>>, + /// Handle to the spawned tokio task so we can wait for it during cleanup + pub task_handle: Option>, /// Native metrics pub metrics: Arc, // The interval in milliseconds to update metrics @@ -317,6 +319,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( input_sources, stream: None, batch_receiver: None, + task_handle: None, metrics, metrics_update_interval, metrics_last_update_time: Instant::now(), @@ -579,7 +582,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // decreasing to 1 would serialize production and consumption. let (tx, rx) = mpsc::channel(2); let mut stream = stream; - get_runtime().spawn(async move { + let handle = get_runtime().spawn(async move { let result = std::panic::AssertUnwindSafe(async { while let Some(batch) = stream.next().await { if tx.send(batch).await.is_err() { @@ -606,6 +609,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( } }); exec_context.batch_receiver = Some(rx); + exec_context.task_handle = Some(handle); } else { exec_context.stream = Some(stream); } @@ -701,6 +705,17 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( execution_context.task_attempt_id, ); + // If a tokio task was spawned for async execution, wait for it to + // complete before dropping the context. This ensures all + // MemoryReservations held by the stream are released back to Spark + // before Spark calls cleanUpAllAllocatedMemory(). + if let Some(handle) = execution_context.task_handle.take() { + // Drop the receiver first so the task sees a closed channel + // and exits its loop. + execution_context.batch_receiver.take(); + let _ = get_runtime().block_on(handle); + } + let _: Box = Box::from_raw(execution_context); Ok(()) })