Skip to content

fix: wait for spawned tokio task before releasing native plan#3833

Draft
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:fix-async-cleanup-race
Draft

fix: wait for spawned tokio task before releasing native plan#3833
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:fix-async-cleanup-race

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2453.

Rationale for this change

When a tokio task is spawned for async execution (the path taken when there are no JVM data sources), the execution stream and its MemoryReservations are owned by the tokio task. If releasePlan drops the ExecutionContext without waiting for the task to complete, the reservations are released asynchronously on the tokio thread — potentially after Spark has already called cleanUpAllAllocatedMemory(), causing:

WARN ExecutionMemoryPool: Internal error: release called on 917504 bytes but task only has 0 bytes of memory from the off-heap execution pool

The sequence that triggers this:

  1. TaskCompletionListener calls CometExecIterator.close()releasePlan() via JNI
  2. releasePlan drops ExecutionContext, which drops batch_receiver — signaling the tokio task to stop
  3. releasePlan returns to JVM before the tokio task finishes cleanup
  4. Spark calls cleanUpAllAllocatedMemory() — zeroes out the task's allocation
  5. The tokio task finally drops the stream and its MemoryReservations → release_to_spark() → Spark sees "0 bytes"

This also fixes Source 2 of #2470GlobalRefs held by the stream are now dropped while the JVM thread is still the caller, avoiding the "Dropping a GlobalRef in a detached thread" warning.

What changes are included in this PR?

  • Add a task_handle field to ExecutionContext to store the JoinHandle from the spawned tokio task
  • In releasePlan, drop the batch_receiver first (to signal the task to exit its loop), then block_on the handle to wait for the tokio task to complete before dropping the context

This guarantees all memory releases and GlobalRef drops happen before releasePlan returns to the JVM.

How are these changes tested?

This race condition requires a full Spark executor environment where the task completion sequence (TaskCompletionListenercleanUpAllAllocatedMemory) races with async tokio task cleanup. It is not reproducible in unit tests. The fix is verified by code inspection: block_on(handle) ensures the tokio task completes (dropping the stream and all reservations) before releasePlan returns. Clippy passes cleanly.

When a tokio task is spawned for async execution (no JVM data sources),
the stream and its MemoryReservations are owned by the tokio task. If
releasePlan drops the ExecutionContext without waiting for the task to
complete, the reservations are released asynchronously on the tokio
thread — potentially after Spark has already called
cleanUpAllAllocatedMemory(), causing "release called on X bytes but
task only has 0 bytes" warnings.

Store the JoinHandle from the spawned task and block on it in
releasePlan after dropping the batch receiver (to signal the task to
exit). This ensures all memory is released back to Spark before
releasePlan returns to the JVM.

Also fixes Source 2 of apache#2470 — GlobalRefs held by the stream are now
dropped while the JVM thread is still the caller, avoiding detached
thread warnings.

Closes apache#2453
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ExecutionMemoryPool errors releasing more memory than allocated

1 participant