GH-48636: [C++][Parquet] Improve parquet reading using multi threads#50158
GH-48636: [C++][Parquet] Improve parquet reading using multi threads#50158OmBiradar wants to merge 1 commit into
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or See also: |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Updates Parquet testing submodule pins and introduces optional parallelism when loading/building nested struct children readers to improve throughput when use_threads() is enabled.
Changes:
- Bump
testingandcpp/submodules/parquet-testingsubmodule commits. - Use
::arrow::internal::OptionalParallelForto parallelizeStructReader::LoadBatch. - Parallelize
StructReader::BuildArraychild array construction before converting chunks to single arrays.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| testing | Updates submodule commit SHA. |
| cpp/submodules/parquet-testing | Updates submodule commit SHA for parquet-testing. |
| cpp/src/parquet/arrow/reader.cc | Adds optional parallel execution for struct child LoadBatch and BuildArray. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return ::arrow::internal::OptionalParallelFor( | ||
| ctx_->reader_properties->use_threads(), static_cast<int>(children_.size()), | ||
| [&](int i) { return children_[i]->LoadBatch(records_to_read); }); |
There was a problem hiding this comment.
template <class FUNCTION>
Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
} else {
for (int i = 0; i < num_tasks; ++i) {
RETURN_NOT_OK(func(i));
}
return Status::OK();
}
}At the end, it's always cast to an int so truncating is unavoidable.
| std::shared_ptr<ChunkedArray> field; | ||
| RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field)); | ||
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field)); | ||
| const int num_children = static_cast<int>(children_.size()); |
There was a problem hiding this comment.
template <class FUNCTION>
Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
} else {
for (int i = 0; i < num_tasks; ++i) {
RETURN_NOT_OK(func(i));
}
return Status::OK();
}
}At the end, it's always cast to an int so truncating is unavoidable.
Signed-off-by: OmBiradar <ombiradar04@gmail.com>
fdade64 to
3512d99
Compare
Rationale for this change
Currently the parquet file structs are read sequentially, even when the
use_threadsoption is used by the user. This PR aims to bridge this gap by making the struct reading truly parallel.What changes are included in this PR?
Changes to the
LoadBatchandBuildArrayfunctions in theStructReaderin the Parquet reader to enable multi threaded reads of structs in parquet files.Are these changes tested?
Are there any user-facing changes?
This is purely a performance related PR. No changes to the user or API is needed.