Skip to content

Conversation

@sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Dec 8, 2025

Closes #13969

Overview

This PR tightly couples the virtual memory budget with the lifecycle of the actual memory buffer HostMemoryBuffer used in the runner, by making MemoryBoundedAsyncRunner serve as both the resource holder and the HostMemoryAllocator. This design eliminates the previous disconnect between budget accounting and actual memory usage, enabling more precise resource management and improved concurrency.


Key Features

  • Early Release: Over-claimed budget is returned to the global pool as soon as possible
  • Automatic Lifecycle: Runners auto-close when all resources are returned, simplifying management

Performance comparison between V1 and V2

Memory Limit Max Thread Version NDS-H NDS
baseline / 64 473s 351s
enable 8g 64 V1 493s N/A
enable 8g 64 V2 477s 359s
enable 16g 64 V1 480s 441s
enable 16g 64 V2 466s 357s

Design

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────────────────────────┐
│                            ResourceBoundedThreadExecutor                                │
│                                                                                         │
│  ┌─────────────────────────────────────────────────────────────────────────────────┐   │
│  │                         Global HostMemoryPool                                    │   │
│  │                                                                                  │   │
│  │   ┌────────────────────────────────────────────────────────────────────────┐    │   │
│  │   │  maxHostMemoryBytes                                                    │    │   │
│  │   │  ════════════════════════════════════════════════════════════════════  │    │   │
│  │   │                                                                        │    │   │
│  │   │  remaining (available for acquire/borrow)                              │    │   │
│  │   │  ◄─────────────────────────────────────────────────────────────────────│    │   │
│  │   │                                                                        │    │   │
│  │   └────────────────────────────────────────────────────────────────────────┘    │   │
│  │                                                                                  │   │
│  │   Synchronization:                                                               │   │
│  │   • acquireCondition - waiters for initial budget acquisition                   │   │
│  │   • borrowCondition  - waiters for dynamic memory borrowing (HIGHER PRIORITY)   │   │
│  │                                                                                  │   │
│  └─────────────────────────────────────────────────────────────────────────────────┘   │
│                                          │                                              │
│                    ┌─────────────────────┼─────────────────────┐                        │
│                    │ acquire()           │ acquire()           │ acquire()              │
│                    ▼                     ▼                     ▼                        │
│       ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐          │
│       │MemoryBoundedRunner A│ │MemoryBoundedRunner B│ │MemoryBoundedRunner C│          │
│       │                     │ │                     │ │                     │          │
│       │  ┌───────────────┐  │ │  ┌───────────────┐  │ │  ┌───────────────┐  │          │
│       │  │  LocalPool A  │  │ │  │  LocalPool B  │  │ │  │  LocalPool C  │  │          │
│       │  │ ┌─────┬─────┐ │  │ │  │ ┌─────┬─────┐ │  │ │  │ ┌─────┬─────┐ │  │          │
│       │  │ │USED │FREE │ │  │ │  │ │USED │FREE │ │  │ │  │ │USED │FREE │ │  │          │
│       │  │ └─────┴─────┘ │  │ │  │ └─────┴─────┘ │  │ │  │ └─────┴─────┘ │  │          │
│       │  └───────────────┘  │ │  └───────────────┘  │ │  └───────────────┘  │          │
│       └─────────────────────┘ └─────────────────────┘ └─────────────────────┘          │
│                    │                     │                     │                        │
│                    └─────────────────────┼─────────────────────┘                        │
│                                          │ tryFree() / finishUpRunner()                 │
│                                          ▼                                              │
│                              Return budget to Global Pool                               │
└─────────────────────────────────────────────────────────────────────────────────────────┘

1. LocalPool Initialization

When a MemoryBoundedAsyncRunner is scheduled for execution, it acquires a preliminary memory budget from the global HostMemoryPool (managed by ResourceBoundedExecutor). This budget becomes the runner's LocalPool—a private memory quota that the runner manages independently during its execution lifecycle.

The initial LocalPool size is typically derived from the PartitionedFile split length, representing an upper-bound estimate of the memory required to process the assigned data partition.

2. LocalPool Structure: Used vs Free

The LocalPool is logically divided into two portions:

Portion Description
Used Memory currently backing live HostMemoryBuffer instances (tracked by usedMem)
Free Remaining budget available for future allocations (localPool - usedMem)

This partitioning allows the runner to track exactly how much of its budget is actively in use versus how much remains available—enabling early release of over-claimed budget.

3. Allocation Flow: Local-First with Dynamic Borrowing

When a buffer allocation request arrives, the runner follows a local-first strategy:

  1. Check LocalPool Free Portion: Attempt to satisfy the request using available free budget
  2. Borrow if Insufficient: If the free portion cannot cover the request, dynamically borrow the deficit from the global HostMemoryPool

Borrowing Semantics:

  • Borrow requests are blocking—the runner waits until sufficient budget becomes available
  • Borrowers have higher priority than runners waiting to acquire initial budget, ensuring that active work completes before new work is scheduled
  • Forceful borrowing: Under certain deadlock-prone conditions (e.g., all in-flight runners are blocked waiting to borrow), the borrow proceeds immediately regardless of available budget. This may leave the HostMemoryPool with a negative remaining balance, but guarantees forward progress

This dynamic borrowing mechanism handles cases where the initial budget estimate is insufficient—such as when file readers need to access metadata beyond the split boundaries (footers, adjacent row groups, etc.).

            ┌─────────────────────────┐
            │  runner.allocate(size)  │
            │  (HostMemoryAllocator)  │
            └───────────┬─────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────┐
│  STEP 1: Attempt Local Allocation                   │
│  ───────────────────────────────────────────────    │
│                                                     │
│  var memToBorrow = 0L                               │
│  val newUsed = usedMem.updateAndGet { curUsed =>    │
│    val total = curUsed + size                       │
│    memToBorrow = total - localPool // Deficit       │
│    total min localPool             // Cap it        │
│  }                                                  │
│                                                     │
│  ┌───────────────────────────────────────────────┐  │
│  │ Before: │████ USED ████│░░░░ FREE ░░░░│       │  │
│  │                                               │  │
│  │ Case A (fits in FREE):                        │  │
│  │ After:  │████ USED ████│██│░░ FREE ░░│        │  │
│  │                         └┬┘                   │  │
│  │                          └── new allocation   │  │
│  │                                               │  │
│  │ Case B (need to borrow):                      │  │
│  │ After:  │████ USED ████│█████│ +BORROW──►     │  │
│  └───────────────────────────────────────────────┘  │
│                                                     │
└─────────────────────────────────────────────────────┘
                        │
                        ▼
            ┌───────────────────────┐
            │  memToBorrow > 0?     │
            └───────────┬───────────┘
                        │
            ┌───────────┴───────────┐
            │No                     │Yes
            ▼                       ▼
┌───────────────────┐  ┌───────────────────────────────┐
│ Skip to STEP 3    │  │ STEP 2: Borrow from Global    │
│ (enough locally)  │  │ ───────────────────────────── │
└───────────────────┘  │                               │
                       │ poolPtr.borrowMemory(amt)     │
                       │ // BLOCKING call              │
                       │                               │
                       │ Borrowing Semantics:          │
                       │ • Wait on borrowCondition     │
                       │ • Borrowers have HIGHER       │
                       │   priority than acquirers     │
                       │ • Deadlock Prevention:        │
                       │   if all runners waiting,     │
                       │   proceed (over-commit OK)    │
                       │                               │
                       │ After borrow:                 │
                       │ localPool += memToBorrow      │
                       │ usedMem += memToBorrow        │
                       │                               │
                       └───────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────┐
│  STEP 3: Allocate Physical Buffer                   │
│  ───────────────────────────────────────────────    │
│                                                     │
│  val buf = withRetryNoSplit {                       │
│    baseMemoryAllocator.allocate(size)               │
│  }                                                  │
│                                                     │
│  // Attach close handler for budget return          │
│  HostAlloc.addEventHandler(buf,                     │
│    new OnCloseHandler(size, this))                  │
│                                                     │
│  return buf                                         │
│                                                     │
└─────────────────────────────────────────────────────┘

4. Deallocation Flow: Event-Driven Budget Return

Buffer release triggers an automatic cascade of budget management:

Step 1: Return to LocalPool
When a HostMemoryBuffer is closed (refCount reaches 0), the attached OnCloseHandler fires and returns the corresponding virtual budget back to the runner's LocalPool (decrements usedMem).

Step 2: Early Release via tryFree
If the runner has completed execution (no longer in Running state), the handler triggers tryFree to immediately return the free portion of LocalPool back to the global HostMemoryPool. This releases over-claimed budget as early as possible, improving pool utilization and allowing other runners to be scheduled sooner.

Step 3: Auto-Close on Full Drain
When LocalPool drops to zero—meaning all physical buffers have been closed and all budget has been returned—the runner can be safely closed automatically. This simplifies lifecycle management by eliminating explicit close coordination.

            ┌─────────────────────────┐
            │ HostMemoryBuffer.close()│
            │ (refCount reaches 0)    │
            └───────────┬─────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────┐
│ STEP 1: Return Budget to LocalPool                  │
│ ────────────────────────────────────────────────    │
│                                                     │
│ runner.withStateLock() {                            │
│   usedMem.addAndGet(-bufferSize)                    │
│   bufCloseCond.signal()                             │
│                                                     │
│   ┌───────────────────────────────────────────────┐ │
│   │ Before: │████████ USED ████████│░░ FREE ░░│   │ │
│   │                                               │ │
│   │ After:  │████ USED ████│░░░░░░ FREE ░░░░░░│   │ │
│   │                         └───────┬──────────┘  │ │
│   │                                 └─ returned   │ │
│   └───────────────────────────────────────────────┘ │
│ }                                                   │
│                                                     │
└─────────────────────────────────────────────────────┘
                        │
                        ▼
            ┌───────────────────────┐
            │ getState != Running &&│
            │ !closeStarted.get()?  │
            └───────────┬───────────┘
                        │
            ┌───────────┴───────────┐
            │No                     │Yes
            ▼                       ▼
┌───────────────────┐  ┌───────────────────────────────┐
│ No early release  │  │ STEP 2: Early Release         │
│ (still Running or │  │ ───────────────────────────── │
│ closing)          │  │                               │
└───────────────────┘  │ poolPtr.release(runner,false) │
                       │                               │
                       │ // tryFree() calculates:      │
                       │ free = localPool - usedMem    │
                       │ remain = usedMem              │
                       │ localPool -= free             │
                       │                               │
                       │ // Return to global pool:     │
                       │ remaining += free             │
                       │                               │
                       │ // Wake waiters (priority):   │
                       │ if (borrowCondition waiters)  │
                       │   borrowCondition.signalAll() │
                       │ else if (acquireCond waiters) │
                       │   acquireCondition.signalAll()│
                       │                               │
                       │ ┌───────────────────────────┐ │
                       │ │ Before: │██USED██│░FREE░│ │ │
                       │ │                          │ │
                       │ │ After:  │██USED██│       │ │
                       │ │          └──┬───┘        │ │
                       │ │        localPool shrinks │ │
                       │ │        FREE → Global Pool│ │
                       │ └───────────────────────────┘ │
                       │                               │
                       └───────────────────────────────┘
                                    │
                                    ▼
                       ┌───────────────────────┐
                       │ remain == 0?          │
                       │ (LocalPool drained)   │
                       └───────────┬───────────┘
                                   │
                       ┌───────────┴───────────┐
                       │No                     │Yes
                       ▼                       ▼
          ┌───────────────────┐  ┌─────────────────────┐
          │ Wait for more     │  │ STEP 3: Auto-Close  │
          │ buffers to close  │  │ ─────────────────── │
          └───────────────────┘  │                     │
                                 │ closeRunner(runner) │
                                 │ 1. runner.onClose() │
                                 │ 2. setState(Closed) │
                                 │ 3. unregisterRunner │
                                 │                     │
                                 └─────────────────────┘

Code References

Component File Lines Description
Global Pool Acquire ResourcePools.scala L108-171 Blocking acquire with timeout and deadlock prevention
Global Pool Release ResourcePools.scala L199-235 tryFree + return to pool + wake waiters
Borrow Memory ResourcePools.scala L248-290 Priority-based blocking borrow with deadlock prevention
LocalPool Variables AsyncRunners.scala L525-531 localPool (capacity) and usedMem (AtomicLong)
Runner.allocate() AsyncRunners.scala L468-498 Local-first allocation with borrow fallback
OnCloseHandler AsyncRunners.scala L505-522 Event-driven return to LocalPool + early release trigger
onClose() Blocking AsyncRunners.scala L417-443 Wait for all buffers to close before runner closes
tryFree() AsyncRunners.scala L380-395 Calculate freeable portion of LocalPool
beforeExecute() ResourceBoundedThreadExecutor.scala L200-250 State transition and acquire orchestration
afterExecute() ResourceBoundedThreadExecutor.scala L252-315 Post-execution handling and release

@sperlingxx sperlingxx changed the title [draft] Triple buffering: : Bind Virtual Resource Budget to Physical Memory Allocation [draft] Triple buffering: Bind Virtual Resource Budget to Physical Memory Allocation Dec 8, 2025
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx sperlingxx force-pushed the flow_ctrl_trip_buffer_ver2 branch from 3d40aa4 to b2278cc Compare December 8, 2025 12:37
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 8, 2025

Greptile Summary

This PR implements tight coupling between virtual memory budgets and physical HostMemoryBuffer lifecycles in the triple buffering system. The MemoryBoundedAsyncRunner now acts as both resource holder and HostMemoryAllocator, enabling event-driven budget release through OnCloseHandler callbacks.

Key Changes

  • LocalPool Architecture: Runners maintain a localPool (virtual budget) and usedMem (physical allocation) to track memory precisely. The free portion (localPool - usedMem) enables early budget return to the global pool.

  • Dynamic Borrowing: When initial budget estimates are insufficient, runners can dynamically borrowMemory from the global pool with priority semantics. Borrowers have higher priority than new acquisitions, and deadlock prevention allows over-commits when all in-flight runners are blocked.

  • Event-Driven Release: OnCloseHandler automatically returns budget when buffers are closed (refCount reaches 0). If the runner is not Running and not closing, it triggers release() to immediately free unused budget and potentially auto-close the runner when localPool reaches zero.

  • Removed DecayReleaseResult: The explicit decay callback mechanism is eliminated - resource management is now self-sustaining through event handlers.

Issues Found

Critical: The allocate() method requires holding the state lock (AsyncRunners.scala:471-472) but is called from user code paths that may not acquire it. This will cause require check failures at runtime.

Race Condition: Between calculating memToBrw and calling borrowMemory (AsyncRunners.scala:476-480), concurrent buffer closes could make the borrow amount incorrect.

Deadlock Detection: The numRunnerInFlight counter is decremented outside poolLock in a post-hook, creating a window where deadlock detection logic (ResourcePools.scala:266) operates on stale counts.

Confidence Score: 2/5

  • Contains critical logic issues that could cause runtime failures and race conditions under concurrent load
  • Score reflects a sophisticated memory management design with clear architectural improvements, but implementation has critical issues: (1) state lock requirement in allocate() is incompatible with caller paths, (2) race condition in borrow calculation, (3) stale counter reads in deadlock detection. These issues could manifest as require failures, incorrect memory accounting, or deadlock under high concurrency. The TODO comment also indicates incomplete implementation.
  • AsyncRunners.scala requires immediate attention for the state lock and race condition issues. ResourcePools.scala needs review of deadlock detection logic.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala Major refactor: MemoryBoundedAsyncRunner now implements HostMemoryAllocator with LocalPool tracking and event-driven budget release. Critical race condition in allocate() method requiring state lock.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala Adds borrowMemory() with priority semantics and deadlock prevention. Release logic now supports auto-close when LocalPool reaches zero. Priority signaling for borrow vs acquire waiters.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala Integrates custom allocator into readPartFile. Removes DecayReleaseResult pattern in favor of event-driven release. Passes allocator through decompression path.

Sequence Diagram

sequenceDiagram
    participant Executor as ResourceBoundedThreadExecutor
    participant Pool as HostMemoryPool
    participant Runner as MemoryBoundedAsyncRunner
    participant BaseAlloc as Base HostMemoryAllocator
    participant Handler as OnCloseHandler
    
    Note over Executor,Pool: Acquisition Phase
    Executor->>Pool: acquire(runner, timeout)
    Pool->>Pool: Check remaining >= requiredMemory
    alt Sufficient memory
        Pool->>Pool: remaining -= requiredMemory
        Pool->>Runner: onStart(pool)
        Runner->>Runner: localPool = requiredMemory<br/>usedMem = 0
        Pool-->>Executor: AcquireSuccessful
    else Insufficient but numInFlight==0
        Pool->>Pool: Over-commit (remaining < 0)
        Pool->>Runner: onStart(pool)
        Pool-->>Executor: AcquireSuccessful (deadlock prevention)
    end
    
    Note over Runner,Handler: Execution Phase - Allocation
    Runner->>Runner: allocate(size)
    Runner->>Runner: usedMem.updateAndGet(curUsed + size)
    alt Local pool sufficient
        Runner->>BaseAlloc: allocate(size)
        BaseAlloc-->>Runner: HostMemoryBuffer
        Runner->>Handler: Register OnCloseHandler
    else Local pool insufficient
        Runner->>Pool: borrowMemory(deficit)
        Pool->>Pool: Block until available or deadlock prevention
        Pool->>Pool: remaining -= deficit
        Runner->>Runner: localPool += deficit
        Runner->>BaseAlloc: allocate(size)
        BaseAlloc-->>Runner: HostMemoryBuffer
        Runner->>Handler: Register OnCloseHandler
    end
    
    Note over Handler,Pool: Buffer Close - Event Driven
    Handler->>Handler: onClosed(refCount==0)
    Handler->>Runner: withStateLock()
    Handler->>Runner: usedMem -= bufferSize
    Handler->>Runner: bufCloseCond.signal()
    alt Runner not Running && !closeStarted
        Handler->>Pool: release(runner, forcefully=false)
        Pool->>Runner: tryFree(byForce=false)
        Runner->>Runner: freed = localPool - usedMem
        Runner->>Runner: localPool -= freed
        Pool->>Pool: remaining += freed
        Pool->>Pool: Wake borrowers/acquirers
        alt localPool == 0
            Pool->>Runner: closeRunner()
            Runner->>Runner: onClose() - wait for usedMem==0
            Runner->>Runner: setState(Closed)
        end
    end
    
    Note over Runner,Pool: Force Close on Completion
    Executor->>Runner: close()
    Runner->>Pool: finishUpRunner()
    Pool->>Pool: release(runner, forcefully=true)
    Pool->>Runner: tryFree(byForce=true)
    Runner->>Runner: freed = localPool
    Pool->>Pool: remaining += freed
    Pool->>Runner: closeRunner()
    Runner->>Runner: onClose() - wait for usedMem==0
    Runner->>Runner: setState(Closed)
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (4)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 454-456 (link)

    logic: requires holding state lock but allocate is called from file reader threads that may not hold it

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 507-511 (link)

    logic: potential deadlock: OnCloseHandler.onClosed tries to acquire state lock while already holding bufCloseLock, but onClose() holds state lock then acquires bufCloseLock (line 415) - classic lock ordering violation

  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 265-268 (link)

    logic: deadlock prevention logic uses >= but initializes numRunnerInFlight to 0 - when all runners are borrowing (numOfBorrowWaiters + 1 == numRunnerInFlight.get()), this would allow infinite over-commit

  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 458-463 (link)

    logic: race condition: memToBrw calculated inside updateAndGet closure but used outside - another thread could modify localPool between lines 463 and 470, making memToBrw stale

9 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (5)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 451-482 (link)

    logic: Potential deadlock: allocate requires holding stateLock and calls borrowMemory, which acquires poolLock. But OnCloseHandler.onClosed (line 496) calls withStateLock() while potentially being called from a thread already holding poolLock (during release at line 220-228). This creates a lock ordering issue: Thread A (poolLockstateLock) vs Thread B (stateLockpoolLock).

    The issue: When release returns freed memory and signals waiting threads (lines 220-228 under poolLock), those awakened threads might immediately close buffers, triggering OnCloseHandler.onClosed callbacks which need stateLock. Meanwhile, a running task holding stateLock in allocate may try to acquire poolLock via borrowMemory.

    Suggested fix: Consider using tryLock with timeout in OnCloseHandler, or restructure locking so borrowMemory doesn't require holding stateLock.

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 413-425 (link)

    logic: Blocking onClose() with unbounded wait could hang indefinitely if buffers are never closed (e.g., leaked references). While the 30s timeout logging helps debugging, the loop never gives up. Consider adding a maximum total wait time or a force-close mechanism after a reasonable timeout to prevent indefinite hangs.

  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 260-295 (link)

    logic: Edge case in deadlock prevention: if numOfBorrowWaiters + 1 >= numRunnerInFlight.get() (line 266), the borrower proceeds immediately even if remaining < 0. However, numRunnerInFlight is decremented in the post-hook (line 154-163) which runs AFTER the runner completes. If all runners are blocked in borrowMemory waiting for memory, but haven't decremented numRunnerInFlight yet, the condition may never trigger correctly. The timing window where runners finish execution but haven't yet run their post-hooks could cause false negatives for deadlock detection.

  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 456-471 (link)

    logic: Race condition in memory accounting: usedMem.updateAndGet (line 457) updates atomically, but the subsequent borrowMemory call (line 468) and second usedMem.addAndGet (line 470) are separate operations. If another thread closes a buffer between lines 462-468, the OnCloseHandler will decrement usedMem and potentially call tryFree/release, which could see inconsistent state where memory was borrowed but not yet added to usedMem. This could lead to over-release of memory back to the pool.

  5. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 234-237 (link)

    logic: Auto-close logic race: checking remain == 0L and closeStarted.compareAndSet(false, true) (line 235) outside runner's state lock creates a race window. If two threads call release concurrently (one from OnCloseHandler, one from explicit release), both might see remain == 0 and one wins the CAS, but the other continues with invalid assumptions. Should verify remain == 0 is checked under lock or document thread-safety assumptions more clearly.

9 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

@sperlingxx
Copy link
Collaborator Author

build

1 similar comment
@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx
Copy link
Collaborator Author

Additional Comments (5)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 451-482 (link)
    logic: Potential deadlock: allocate requires holding stateLock and calls borrowMemory, which acquires poolLock. But OnCloseHandler.onClosed (line 496) calls withStateLock() while potentially being called from a thread already holding poolLock (during release at line 220-228). This creates a lock ordering issue: Thread A (poolLockstateLock) vs Thread B (stateLockpoolLock).
    The issue: When release returns freed memory and signals waiting threads (lines 220-228 under poolLock), those awakened threads might immediately close buffers, triggering OnCloseHandler.onClosed callbacks which need stateLock. Meanwhile, a running task holding stateLock in allocate may try to acquire poolLock via borrowMemory.
    Suggested fix: Consider using tryLock with timeout in OnCloseHandler, or restructure locking so borrowMemory doesn't require holding stateLock.
  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 413-425 (link)
    logic: Blocking onClose() with unbounded wait could hang indefinitely if buffers are never closed (e.g., leaked references). While the 30s timeout logging helps debugging, the loop never gives up. Consider adding a maximum total wait time or a force-close mechanism after a reasonable timeout to prevent indefinite hangs.
  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 260-295 (link)
    logic: Edge case in deadlock prevention: if numOfBorrowWaiters + 1 >= numRunnerInFlight.get() (line 266), the borrower proceeds immediately even if remaining < 0. However, numRunnerInFlight is decremented in the post-hook (line 154-163) which runs AFTER the runner completes. If all runners are blocked in borrowMemory waiting for memory, but haven't decremented numRunnerInFlight yet, the condition may never trigger correctly. The timing window where runners finish execution but haven't yet run their post-hooks could cause false negatives for deadlock detection.
  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 456-471 (link)
    logic: Race condition in memory accounting: usedMem.updateAndGet (line 457) updates atomically, but the subsequent borrowMemory call (line 468) and second usedMem.addAndGet (line 470) are separate operations. If another thread closes a buffer between lines 462-468, the OnCloseHandler will decrement usedMem and potentially call tryFree/release, which could see inconsistent state where memory was borrowed but not yet added to usedMem. This could lead to over-release of memory back to the pool.
  5. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 234-237 (link)
    logic: Auto-close logic race: checking remain == 0L and closeStarted.compareAndSet(false, true) (line 235) outside runner's state lock creates a race window. If two threads call release concurrently (one from OnCloseHandler, one from explicit release), both might see remain == 0 and one wins the CAS, but the other continues with invalid assumptions. Should verify remain == 0 is checked under lock or document thread-safety assumptions more clearly.

9 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

@greptileai As for the first point, I don't think it will lead to a deadlock issue regarding that once the lock ordering (stateLockpoolLock) occurs, which means that we need to borrowMemory which only happens in the RUNNING state of AsyncRunner. And during the RUNNING phase, the stateLock of this AsyncRunner shall be taken from the very beginning to the very end. What's more, the lock ordering (poolLockstateLock) seems to be impossible to happen with the current implementation if I am not wrong.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You make a good point about the lock ordering analysis. Let me examine this more carefully.

You're correct that during the RUNNING state, the runner should hold its stateLock from beginning to end, which would prevent the poolLockstateLock ordering scenario I described.

However, looking at the code more closely, I see a potential issue with this assumption. In ResourceBoundedThreadExecutor.afterExecute() (lines 265-278), the stateLock is released with releaseAnyway = true, but then mgr.release() is called which can trigger the OnCloseHandler callbacks. Here's the sequence:

  1. afterExecute() calls runner.withStateLock(releaseAnyway = true)
  2. This releases the stateLock at the end
  3. Then mgr.release(rr, forcefully = false) is called
  4. This can trigger buffer closures and OnCloseHandler.onClosed() callbacks
  5. Those callbacks try to acquire stateLock again via withStateLock()

So the runner might not actually hold stateLock continuously through the entire lifecycle as expected.

But you're right that the specific deadlock scenario I described (poolLockstateLock vs stateLockpoolLock) seems less likely given the current control flow.

The more realistic concern might be:

  • Thread A: In OnCloseHandler.onClosed(), holds stateLock and calls poolPtr.release() which needs poolLock
  • Thread B: In HostMemoryPool.release(), holds poolLock and the release triggers other buffer closures that need stateLock

But even this seems unlikely given the current implementation structure.

You've convinced me that my initial deadlock concern was overstated. The lock ordering appears to be more controlled than I initially assessed. Thank you for the correction!

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (2)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 482-489 (link)

    logic: Potential deadlock: allocate() holds stateLock and calls borrowMemory() which acquires poolLock, while OnCloseHandler.onClosed() may be triggered from threads holding poolLock and needs stateLock. This creates a lock ordering conflict that could cause deadlocks. Have you verified the lock ordering is consistent between the allocation path and the buffer close event path to prevent deadlocks?

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 475-489 (link)

    logic: Race condition in memory accounting: gap between usedMem.updateAndGet(), borrowMemory(), and second usedMem.addAndGet() can lead to inconsistent state if concurrent buffer closes occur between these operations.

11 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx sperlingxx changed the title [draft] Triple buffering: Bind Virtual Resource Budget to Physical Memory Allocation Triple buffering: Bind Virtual Resource Budget to Physical Memory Allocation [databricks] Dec 17, 2025
@sperlingxx
Copy link
Collaborator Author

build

@sameerz sameerz added the performance A performance related task/issue label Jan 5, 2026
@sperlingxx
Copy link
Collaborator Author

@greptileai

sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 482-489 (link)
logic: Potential deadlock: allocate() holds stateLock and calls borrowMemory() which acquires poolLock, while OnCloseHandler.onClosed() may be triggered from threads holding poolLock and needs stateLock. This creates a lock ordering conflict that could cause deadlocks. Have you verified the lock ordering is consistent between the allocation path and the buffer close event path to prevent deadlocks?

The lock ordering conflict you pointed out should not happen because OnCloseHandler.onClosed will not go to the poolPtr.release and take the poolLock if getState == Running. And borrow memory only happens when getState == Running.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct that the specific scenario you described (borrow during Running + OnCloseHandler calling poolPtr.release) won't happen because of the state check. However, there's still a potential deadlock risk in the current design.

The issue is that OnCloseHandler.onClosed() always acquires the stateLock first (line 420 in r.withStateLock[Unit]() { _ =>), regardless of the runner state. Meanwhile, allocate() holds the stateLock and then calls borrowMemory() which acquires the poolLock.

Here's the potential deadlock scenario:

Thread A (allocation path):

  1. Holds stateLock (in allocate())
  2. Calls borrowMemory() → tries to acquire poolLock

Thread B (buffer close path):

  1. Holds poolLock (from some pool operation)
  2. Buffer gets closed → OnCloseHandler.onClosed() → tries to acquire stateLock

This creates a classic lock ordering deadlock: A holds stateLock→wants poolLock, B holds poolLock→wants stateLock.

Suggested fix:
Consider restructuring the lock acquisition order to be consistent. For example:

override def allocate(size: Long, preferPinned: Boolean): HostMemoryBuffer = {
  require(getState == Running, s"Memory allocation is only allowed in Running state: $this")
  require(isHoldingStateLock, s"The caller must hold the state lock: $this")

  // Check and update the used memory atomically
  var memToBrw = 0L
  var newUsed = usedMem.updateAndGet { curUsed: Long =>
    val newUsed = curUsed + size
    memToBrw = newUsed - localPool
    newUsed min localPool
  }
  // If the local pool is insufficient, try to borrow from the global pool
  // Release stateLock before acquiring poolLock to maintain consistent lock ordering
  var borrowedAmount = 0L
  if (memToBrw > 0) {
    logWarning(
      s"[runnerID=$runnerId] LocalMemPool ${bToStr(localPool)}(used ${bToStr(newUsed)}) " +
          s"is NOT enough for the ALLOC(${bToStr(size)}): try to borrow ${bToStr(memToBrw)}")
    // Temporarily release state lock to avoid deadlock
    stateLock.unlock()
    try {
      poolPtr.asInstanceOf[HostMemoryPool].borrowMemory(memToBrw)
      borrowedAmount = memToBrw
    } finally {
      stateLock.lock()
      // Re-verify state after reacquiring lock
      require(getState == Running, s"Runner state changed during borrow: $this")
    }
    localPool += borrowedAmount
    newUsed = usedMem.addAndGet(borrowedAmount)
  }
  if (newUsed > peakUsedMem) {
    peakUsedMem = newUsed
  }
  // Call the base allocator to allocate the actual buffer
  val buf = withRetryNoSplit[HostMemoryBuffer] {
    baseMemoryAllocator.allocate(size, preferPinned)
  }
  // Register a close handler to return the memory back either to the local or global pool
  HostAlloc.addEventHandler(buf, new OnCloseHandler(size, this))
  buf
}

This ensures consistent lock ordering: always acquire poolLock before stateLock when both are needed.

@sperlingxx
Copy link
Collaborator Author

You're correct that the specific scenario you described (borrow during Running + OnCloseHandler calling poolPtr.release) won't happen because of the state check. However, there's still a potential deadlock risk in the current design.

The issue is that OnCloseHandler.onClosed() always acquires the stateLock first (line 420 in r.withStateLock[Unit]() { _ =>), regardless of the runner state. Meanwhile, allocate() holds the stateLock and then calls borrowMemory() which acquires the poolLock.

Here's the potential deadlock scenario:

Thread A (allocation path):

  1. Holds stateLock (in allocate())
  2. Calls borrowMemory() → tries to acquire poolLock

Thread B (buffer close path):

  1. Holds poolLock (from some pool operation)
  2. Buffer gets closed → OnCloseHandler.onClosed() → tries to acquire stateLock

This creates a classic lock ordering deadlock: A holds stateLock→wants poolLock, B holds poolLock→wants stateLock.

Suggested fix: Consider restructuring the lock acquisition order to be consistent. For example:

override def allocate(size: Long, preferPinned: Boolean): HostMemoryBuffer = {
  require(getState == Running, s"Memory allocation is only allowed in Running state: $this")
  require(isHoldingStateLock, s"The caller must hold the state lock: $this")

  // Check and update the used memory atomically
  var memToBrw = 0L
  var newUsed = usedMem.updateAndGet { curUsed: Long =>
    val newUsed = curUsed + size
    memToBrw = newUsed - localPool
    newUsed min localPool
  }
  // If the local pool is insufficient, try to borrow from the global pool
  // Release stateLock before acquiring poolLock to maintain consistent lock ordering
  var borrowedAmount = 0L
  if (memToBrw > 0) {
    logWarning(
      s"[runnerID=$runnerId] LocalMemPool ${bToStr(localPool)}(used ${bToStr(newUsed)}) " +
          s"is NOT enough for the ALLOC(${bToStr(size)}): try to borrow ${bToStr(memToBrw)}")
    // Temporarily release state lock to avoid deadlock
    stateLock.unlock()
    try {
      poolPtr.asInstanceOf[HostMemoryPool].borrowMemory(memToBrw)
      borrowedAmount = memToBrw
    } finally {
      stateLock.lock()
      // Re-verify state after reacquiring lock
      require(getState == Running, s"Runner state changed during borrow: $this")
    }
    localPool += borrowedAmount
    newUsed = usedMem.addAndGet(borrowedAmount)
  }
  if (newUsed > peakUsedMem) {
    peakUsedMem = newUsed
  }
  // Call the base allocator to allocate the actual buffer
  val buf = withRetryNoSplit[HostMemoryBuffer] {
    baseMemoryAllocator.allocate(size, preferPinned)
  }
  // Register a close handler to return the memory back either to the local or global pool
  HostAlloc.addEventHandler(buf, new OnCloseHandler(size, this))
  buf
}

This ensures consistent lock ordering: always acquire poolLock before stateLock when both are needed.

@greptileai I do not think Thread B (buffer close path) will hold poolLock under any circumstance

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right! Looking at the code more carefully, I can see that Thread B (buffer close path) would not hold the poolLock when OnCloseHandler.onClosed() is called.

The OnCloseHandler.onClosed() method is triggered by the buffer's reference counting mechanism when refCount reaches 0, which happens when the buffer is closed/released. This is not called from within any pool operation that would already be holding the poolLock.

The potential deadlock scenario I described is not valid because:

  1. Thread B starts from buffer deallocation (when refCount hits 0)
  2. It only tries to acquire the stateLock in OnCloseHandler.onClosed()
  3. It does not start with the poolLock already held

The lock ordering issue doesn't exist here because there's no case where Thread B would be holding poolLock and then trying to acquire stateLock.

Thank you for the correction! The current implementation is actually safe from this particular deadlock scenario. The OnCloseHandler is triggered purely by buffer lifecycle events (refcount reaching 0), not by pool operations that would already be holding locks.

The design is sound - when buffers are closed, they simply try to acquire the runner's stateLock to update memory accounting, and there's no cross-dependencies with the pool lock in this path.

@sperlingxx
Copy link
Collaborator Author

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 475-489 (link)
    logic: Race condition in memory accounting: gap between usedMem.updateAndGet(), borrowMemory(), and second usedMem.addAndGet() can lead to inconsistent state if concurrent buffer closes occur between these operations.

@greptileai Can you elaborate the point a little bit?

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 7, 2026

Looking at lines 475-489 in the PR code, I can see the race condition issue in the memory allocation logic. Let me break down what's happening:

// Check and update the used memory atomically
var memToBrw = 0L
var newUsed = usedMem.updateAndGet { curUsed: Long =>
  val newUsed = curUsed + size
  memToBrw = newUsed - localPool
  newUsed min localPool
}
// If the local pool is insufficient, try to borrow from the global pool
if (memToBrw > 0) {
  // ... logging ...
  // Blocking call to borrow memory from the global pool
  poolPtr.asInstanceOf[HostMemoryPool].borrowMemory(memToBrw)
  localPool += memToBrw
  newUsed = usedMem.addAndGet(memToBrw)
}

The Race Condition:

The problem is that there are three separate atomic operations happening sequentially:

  1. usedMem.updateAndGet() - Updates usedMem but caps it at localPool
  2. borrowMemory() - Borrows memory from global pool (blocking call)
  3. usedMem.addAndGet() - Adds the borrowed amount to usedMem

Between these operations, other threads can execute buffer close handlers (OnCloseHandler.onClosed()), which call usedMem.addAndGet(-bufferSize) to decrement the used memory.

Inconsistent State Scenarios:

  1. After step 1, before step 3: If a buffer is closed here, usedMem gets decremented, but the allocation logic has already committed to borrowing memToBrw based on the old usedMem value.

  2. Memory double-counting: The borrowed memory gets added to both localPool and usedMem, but if buffers were closed between steps 1 and 3, the final usedMem value might not accurately reflect the actual memory usage.

  3. Over-borrowing: The algorithm might borrow more memory than actually needed because it calculated memToBrw based on a stale view of usedMem.

Example:

  • localPool = 100, usedMem = 90, trying to allocate size = 20
  • Step 1: usedMem.updateAndGet() sets usedMem = 100, memToBrw = 10
  • Buffer close happens: usedMem becomes 80
  • Step 2: Borrow 10 bytes, localPool = 110
  • Step 3: usedMem.addAndGet(10) makes usedMem = 90

Now usedMem = 90 but we only needed 80 + 20 = 100 total, so we over-borrowed by 10 bytes.

The fix would be to make the entire allocation decision atomic, or use proper synchronization to prevent buffer closes from interfering with the allocation logic.

@sperlingxx
Copy link
Collaborator Author

Inconsistent State Scenarios:

  1. After step 1, before step 3: If a buffer is closed here, usedMem gets decremented, but the allocation logic has already committed to borrowing memToBrw based on the old usedMem value.
  2. Memory double-counting: The borrowed memory gets added to both localPool and usedMem, but if buffers were closed between steps 1 and 3, the final usedMem value might not accurately reflect the actual memory usage.
  3. Over-borrowing: The algorithm might borrow more memory than actually needed because it calculated memToBrw based on a stale view of usedMem.

@greptileai
As for the potential over-borrowing, you are right, current implementation may lead to over-borrowing if some buffers being released during the process of borrowing memory from global pool. Regarding it is really rare condition, I will put a TODO tag and put it aside.
As for the Memory double-counting, I think you misunderstood the meaning of localPool. The localPool refers the total memory being allocated to the current local runner.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 7, 2026

Thanks for the clarification! You're absolutely correct about localPool representing the total budget allocated to the runner. Now I understand the architecture better:

  • localPool: Total memory budget assigned to this runner
  • usedMem: Currently allocated memory (backing live buffers)
  • localPool - usedMem: Free portion available for new allocations

Regarding the over-borrowing issue you mentioned with the TODO, you're right that it's a rare edge case. For completeness though, here are a few lightweight approaches you might consider if/when you revisit this:

  1. Snapshot approach: Capture usedMem once at the start of borrowing and use that snapshot throughout the borrowing calculation
  2. Retry-with-adjustment: If significant releases occur during borrowing, recalculate the needed amount before committing
  3. Pessimistic borrowing: Accept slight over-borrowing as a trade-off for simpler code, since the memory gets returned via the same event-driven mechanism

The current design is quite elegant - the event-driven release mechanism via OnCloseHandler will naturally correct any temporary over-borrowing, and the rare occurrence makes it a reasonable candidate for future optimization.

Your clarification about localPool semantics also resolves the double-counting concern I initially had. The model of "total budget allocated" vs "currently used from that budget" is much cleaner than what I was initially thinking.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (5)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 470-472 (link)

    logic: The allocate method requires holding the state lock but is called from user code that may not hold it. The readPartFile in GpuParquetScan.scala:2806 calls this allocator without acquiring the lock, which will fail the require check. Need to either remove the lock requirement or ensure all callers acquire it first.

    Check if callers like readPartFile properly acquire stateLock before calling allocations through this allocator. Does the Parquet reading code path acquire the runner's state lock before triggering allocations through the custom allocator?

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 476-480 (link)

    logic: Race condition: usedMem.updateAndGet updates atomically but then memToBrw is calculated and used outside the atomic operation. Between computing memToBrw and calling borrowMemory, another thread could close a buffer (via OnCloseHandler) and reduce usedMem, making the borrow amount incorrect.

  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 266-267 (link)

    logic: Deadlock prevention logic checks numOfBorrowWaiters + 1 >= numRunnerInFlight.get() but numRunnerInFlight is decremented in a post-hook that runs outside the poolLock. This creates a window where the count could be stale, potentially allowing unnecessary over-commits or missing legitimate deadlock scenarios.

    Consider synchronizing the post-hook decrement with the pool lock or using a more robust deadlock detection mechanism.

  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 288-291 (link)

    style: The cascading signal logic from borrowCondition to acquireCondition is clever but fragile. If multiple borrowers are awakened concurrently, each might try to signal acquireCondition, and the numOfBorrowWaiters == 0 check may not accurately reflect pending borrowers still in the process of being awakened.

    This is a minor edge case but could lead to suboptimal scheduling under high contention.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

  5. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 516 (link)

    style: TODO should be addressed - returned memory could satisfy pending borrow requests but doesn't trigger them, leading to suboptimal resource utilization

11 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx
Copy link
Collaborator Author

Additional Comments (5)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 470-472 (link)
    logic: The allocate method requires holding the state lock but is called from user code that may not hold it. The readPartFile in GpuParquetScan.scala:2806 calls this allocator without acquiring the lock, which will fail the require check. Need to either remove the lock requirement or ensure all callers acquire it first.
    Check if callers like readPartFile properly acquire stateLock before calling allocations through this allocator. Does the Parquet reading code path acquire the runner's state lock before triggering allocations through the custom allocator?
  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 476-480 (link)
    logic: Race condition: usedMem.updateAndGet updates atomically but then memToBrw is calculated and used outside the atomic operation. Between computing memToBrw and calling borrowMemory, another thread could close a buffer (via OnCloseHandler) and reduce usedMem, making the borrow amount incorrect.
  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 266-267 (link)
    logic: Deadlock prevention logic checks numOfBorrowWaiters + 1 >= numRunnerInFlight.get() but numRunnerInFlight is decremented in a post-hook that runs outside the poolLock. This creates a window where the count could be stale, potentially allowing unnecessary over-commits or missing legitimate deadlock scenarios.
    Consider synchronizing the post-hook decrement with the pool lock or using a more robust deadlock detection mechanism.
  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/ResourcePools.scala, line 288-291 (link)
    style: The cascading signal logic from borrowCondition to acquireCondition is clever but fragile. If multiple borrowers are awakened concurrently, each might try to signal acquireCondition, and the numOfBorrowWaiters == 0 check may not accurately reflect pending borrowers still in the process of being awakened.
    This is a minor edge case but could lead to suboptimal scheduling under high contention.
    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
  5. sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncRunners.scala, line 516 (link)
    style: TODO should be addressed - returned memory could satisfy pending borrow requests but doesn't trigger them, leading to suboptimal resource utilization

11 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

@greptileai

  1. All the AsyncRunners are wrapped as RapidsFutureTask if they are scheduled by ResourceBoundedThreadExecutor. Otherwise, the HostMemoryAllocator will be default one.

  2. Although the over-borrowing is possible, the value will not be incorrect since counting variables are atomic ones.

  3. You are right, but over-commit is expected behavior.

  4. Good point.

  5. Will do, regarding it is trivial issue.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarifications! Let me address each point:

  1. State lock requirement: Got it - so the allocate method will only be called when the runner is wrapped as a RapidsFutureTask and scheduled by ResourceBoundedThreadExecutor, which ensures the state lock is properly held during execution. The direct calls from readPartFile would use the default allocator, not this custom one. That makes sense.

  2. Race condition concern: You're right that the atomic variables ensure correctness. Even if there's a race between calculating memToBrw and calling borrowMemory, the worst case is slightly over-borrowing, but the accounting remains consistent since usedMem and localPool updates are atomic.

  3. Deadlock prevention: Understood - the over-commit behavior is intentional for deadlock prevention, and some staleness in numRunnerInFlight is acceptable since it errs on the side of allowing progress rather than strict accounting.

  4. Cascading signal logic: This is indeed a subtle edge case. Under high contention with multiple borrowers, the numOfBorrowWaiters == 0 check might not perfectly capture the transitional state of borrowers being awakened. A potential improvement could be to track borrower state more precisely, but as you noted, this is a minor optimization issue.

  5. TODO comment: Since you mentioned this is trivial, a simple fix would be to check if there are pending borrow requests after returning memory and signal the borrowCondition accordingly. Something like:

        // TODO: check if the returned memory satisfies the ongoing borrow requests
        usedMem.addAndGet(-bufferSize)
        bufCloseCond.signal() // awaken onClose waiting thread if exists
        logDebug(s"[OnCloseHandler Closed] bufferSize=${bToStr(bufferSize)} for $r")
        // Try to notify borrowers if memory was returned to the pool
        if (getState != Running && !closeStarted.get()) {
          val (freed, _) = poolPtr.release(r, forcefully = false)
          if (freed > 0 && poolPtr.isInstanceOf[HostMemoryPool]) {
            // Potential optimization: signal borrowers about newly available memory
          }
        }

The design looks solid overall - the tight coupling between virtual budget and physical memory allocation through the MemoryBoundedAsyncRunner serving as both resource holder and allocator is elegant and should provide much better resource management than the previous approach.

@amahussein
Copy link
Collaborator

Thanks @sperlingxx !

Followup on the LocalPool initialization:

The PR description mentions that the initial LocalPool size is derived from the PartitionedFile split length. Since Parquet/ORC files are typically compressed on disk, the uncompressed data in memory will almost always be larger than this split length.

This implies that most runners will inevitably hit the limit and trigger a borrow operation, potentially leading to frequent contention on the global pool lock and performance degradation.

It would be beneficial to make this initialization strategy configurable. For example, allowing us to apply an 'inflation factor' (e.g., split length * 1.5 or split length * 2) or a configurable overhead constant. This would allow us to tune the aggressiveness of the initial allocation and compare the performance tradeoffs between holding a larger initial budget vs. frequent borrowing.

Question about the global pool size

Do you keep a minimum threshold for the global pool in order to satisfy the borrowers requests? This is also can impact the performance if the global-pool frequently empty, then it means that borrowers take longer time to extend their local-pool to meet their progress.

@amahussein amahussein self-requested a review January 7, 2026 16:23
@zpuller
Copy link
Collaborator

zpuller commented Jan 7, 2026

Are there tests or would it be possible to add tests that force the deadlock conditions to occur and exercise those code paths?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

performance A performance related task/issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Triple Buffering: Bind Async Resource Budget to Physical Memory Allocation

4 participants