⚡ Bolt: Defer take in RepartitionExec to parallelize execution#246
⚡ Bolt: Defer take in RepartitionExec to parallelize execution#246google-labs-jules[bot] wants to merge 1 commit intomainfrom
take in RepartitionExec to parallelize execution#246Conversation
This patch improves the performance of `RepartitionExec` for hash-based repartitioning by deferring the materialization of `RecordBatch`es. Previously, the producer task would perform the expensive `take` operation to create a new `RecordBatch` for each output partition. This serialized a costly operation that is now parallelized. This patch introduces a `PartitionedBatch::Indexed` variant, which contains a shared `Arc<RecordBatch>` and a `PrimitiveArray<UInt32Type>` of indices. The producer now sends this lightweight enum over the channels, and the `take` operation is performed by the consumer tasks. This parallelizes the `take` operation across all output partitions, improving throughput. To fix a memory accounting issue that arose from this change, the estimated size of the `Indexed` batch is now passed along with the batch, ensuring that the memory pool's `grow` and `shrink` operations are symmetric.
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
This patch improves the performance of
RepartitionExecfor hash-based repartitioning by deferring the materialization ofRecordBatches. It introduces aPartitionedBatch::Indexedvariant, which contains a sharedArc<RecordBatch>and aPrimitiveArray<UInt32Type>of indices. The producer now sends this lightweight enum over the channels, and thetakeoperation is performed by the consumer tasks. This parallelizes thetakeoperation across all output partitions, improving throughput. A memory accounting issue was also fixed by passing the estimated size of theIndexedbatch along with the batch, ensuring that the memory pool'sgrowandshrinkoperations are symmetric.PR created automatically by Jules for task 14803450406354175091 started by @Dandandan