diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index da3b0245256..a3ecf6d2579 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -917,6 +918,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() std::unordered_map DAGStorageInterpreter::generateSelectQueryInfos() { std::unordered_map ret; + bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat; + // Shared read queue for all physical tables + auto shared_read_queue = std::make_shared(max_streams, log); auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo { SelectQueryInfo query_info; /// to avoid null point exception @@ -934,6 +938,16 @@ std::unordered_map DAGStorageInterpreter::generateSele query_info.req_id = fmt::format("{} table_id={}", log->identifier(), table_id); query_info.keep_order = table_scan.keepOrder(); query_info.is_fast_scan = table_scan.isFastScan(); + if (use_unordered_concat) + { + query_info.read_queue = shared_read_queue; + } + else + { + // Different read queue for different physical table + query_info.read_queue + = std::make_shared(max_streams, Logger::get(query_info.req_id)); + } return query_info; }; RUNTIME_CHECK_MSG(mvcc_query_info->scan_context != nullptr, "Unexpected null scan_context"); @@ -1062,16 +1076,16 @@ Int32 getMaxAllowRetryForLocalRead(const SelectQueryInfo & query_info) } } // namespace -DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( +void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( const TableID & table_id, const SelectQueryInfo & query_info, DAGPipeline & pipeline, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size) { - DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) - return table_snap; + return; assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); auto & storage = storages_with_structure_lock[table_id].storage; @@ -1089,6 +1103,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { try { + DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; if (!dag_context.is_disaggregated_task) { // build local inputstreams @@ -1116,6 +1131,12 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` // may contain different data other than expected. validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + + // Only after all streams are built successfully, we add the task to mvcc_query_info + if (table_snap) + { + disagg_snap->addTask(table_id, std::move(table_snap)); + } break; } catch (RegionException & e) @@ -1126,6 +1147,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { // clean all streams from local because we are not sure the correctness of those streams pipeline.streams.clear(); + // clean table task from read_queue + query_info.read_queue->resetTableTask(table_id); if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry))) continue; // next retry to read from local storage else @@ -1145,20 +1168,19 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal throw; } } - return table_snap; } -DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalExecForPhysicalTable( +void DAGStorageInterpreter::buildLocalExecForPhysicalTable( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, const TableID & table_id, const SelectQueryInfo & query_info, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size) { - DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) - return table_snap; + return; RUNTIME_CHECK(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); auto & storage = storages_with_structure_lock[table_id].storage; @@ -1169,6 +1191,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { try { + DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; if (!dag_context.is_disaggregated_task) { storage->read( @@ -1199,6 +1222,13 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `sourceOps` // may contain different data other than expected. validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + + // Only after all sourceOps are built and verified, we add the snapshot to group_builder + if (table_snap != nullptr) + { + RUNTIME_CHECK(disagg_snap != nullptr); + disagg_snap->addTask(table_id, std::move(table_snap)); + } break; } catch (RegionException & e) @@ -1209,6 +1239,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { // clean all operator from local because we are not sure the correctness of those operators group_builder.reset(); + // clean table task from read_queue + query_info.read_queue->resetTableTask(table_id); if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry))) continue; else @@ -1228,7 +1260,6 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal throw; } } - return table_snap; } void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size) @@ -1239,6 +1270,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max return; mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); + RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read"); bool has_multiple_partitions = table_query_infos.size() > 1; // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case std::shared_ptr stream_pool @@ -1250,12 +1282,17 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max DAGPipeline current_pipeline; const TableID physical_table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - auto table_snap - = buildLocalStreamsForPhysicalTable(physical_table_id, query_info, current_pipeline, max_block_size); - if (table_snap) - { - disaggregated_snap->addTask(physical_table_id, std::move(table_snap)); - } + RUNTIME_CHECK_MSG( + query_info.read_queue != nullptr, + "read_queue should not be null, table_id={}", + physical_table_id); + buildLocalStreamsForPhysicalTable( + physical_table_id, + query_info, + current_pipeline, + disaggregated_snap, + max_block_size); + if (has_multiple_partitions) stream_pool->addPartitionStreams(current_pipeline.streams); else @@ -1265,6 +1302,10 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max current_pipeline.streams.end()); } + assert(!table_query_infos.empty()); // check at start of this function + auto read_queue = table_query_infos.begin()->second.read_queue; + read_queue->finishQueueIfEmpty(); + LOG_DEBUG( log, "local streams built, is_disaggregated_task={} snap_id={}", @@ -1306,21 +1347,29 @@ void DAGStorageInterpreter::buildLocalExec( return; mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); - bool has_multiple_partitions = table_query_infos.size() > 1; - ConcatBuilderPool builder_pool{max_streams}; + RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read"); + const bool has_multiple_partitions = table_query_infos.size() > 1; + + const bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat; + ConcatBuilderPool builder_pool{max_streams, context.getSettingsRef().dt_enable_unordered_concat}; auto disaggregated_snap = std::make_shared(); for (const auto & table_query_info : table_query_infos) { - PipelineExecGroupBuilder builder; const TableID physical_table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - auto table_snap - = buildLocalExecForPhysicalTable(exec_context, builder, physical_table_id, query_info, max_block_size); - if (table_snap) - { - disaggregated_snap->addTask(physical_table_id, std::move(table_snap)); - } + RUNTIME_CHECK_MSG( + query_info.read_queue != nullptr, + "read_queue should not be null, table_id={}", + physical_table_id); + PipelineExecGroupBuilder builder; + buildLocalExecForPhysicalTable( + exec_context, + builder, + physical_table_id, + query_info, + disaggregated_snap, + max_block_size); if (has_multiple_partitions) builder_pool.add(builder); @@ -1328,8 +1377,24 @@ void DAGStorageInterpreter::buildLocalExec( group_builder.merge(std::move(builder)); } - LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task); + if (use_unordered_concat) + { + assert(!table_query_infos.empty()); // check at start of this function + auto read_queue = table_query_infos.begin()->second.read_queue; + read_queue->finishQueueIfEmpty(); + exec_context.addStorageTaskQueue(read_queue); + } + else + { + for (const auto & table_query_info : table_query_infos) + { + const SelectQueryInfo & query_info = table_query_info.second; + query_info.read_queue->finishQueueIfEmpty(); + exec_context.addStorageTaskQueue(query_info.read_queue); + } + } + LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task); if (dag_context.is_disaggregated_task) { // register the snapshot to manager diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 5d88f2d83e6..46783ed5d61 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -73,17 +73,19 @@ class DAGStorageInterpreter const RegionException & e, int num_allow_retry); - DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalStreamsForPhysicalTable( + void buildLocalStreamsForPhysicalTable( const TableID & table_id, const SelectQueryInfo & query_info, DAGPipeline & pipeline, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size); - DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalExecForPhysicalTable( + void buildLocalExecForPhysicalTable( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, const TableID & table_id, const SelectQueryInfo & query_info, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size); void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size); diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a119a040620..1a964bf8d17 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -178,6 +179,7 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + cancelStorageTaskQueues(); cancelSharedQueues(); cancelOneTimeFutures(); if (likely(dag_context)) @@ -234,6 +236,25 @@ void PipelineExecutorContext::cancelSharedQueues() shared_queue->cancel(); } +void PipelineExecutorContext::addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue) +{ + std::lock_guard lock(mu); + RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled."); + assert(storage_task_queue); + storage_task_queues.push_back(storage_task_queue); +} + +void PipelineExecutorContext::cancelStorageTaskQueues() +{ + std::vector tmp; + { + std::lock_guard lock(mu); + std::swap(tmp, storage_task_queues); + } + for (const auto & storage_task_queue : tmp) + storage_task_queue->finishQueue(); +} + void PipelineExecutorContext::addOneTimeFuture(const OneTimeNotifyFuturePtr & future) { std::lock_guard lock(mu); diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 97fba40e889..ad1723694c2 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -40,6 +40,13 @@ class OneTimeNotifyFuture; using OneTimeNotifyFuturePtr = std::shared_ptr; class DAGContext; +namespace DM +{ +class ActiveSegmentReadTaskQueue; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; +} // namespace DM + + class PipelineExecutorContext : private boost::noncopyable { public: @@ -146,6 +153,8 @@ class PipelineExecutorContext : private boost::noncopyable void addOneTimeFuture(const OneTimeNotifyFuturePtr & future); + void addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue); + private: bool setExceptionPtr(const std::exception_ptr & exception_ptr_); @@ -160,6 +169,8 @@ class PipelineExecutorContext : private boost::noncopyable void cancelOneTimeFutures(); + void cancelStorageTaskQueues(); + void cancelResultQueueIfNeed(); private: @@ -196,5 +207,7 @@ class PipelineExecutorContext : private boost::noncopyable std::vector shared_queues; std::vector one_time_futures; + + std::vector storage_task_queues; }; } // namespace DB diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c8408cc85c2..1f7e8683f67 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -184,6 +184,7 @@ struct Settings M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \ M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \ M(SettingDouble, dt_page_gc_threshold_raft_data, 0.05, "Max valid rate of deciding to do a GC for BlobFile storing PageData in PageStorage") \ + M(SettingBool, dt_enable_unordered_concat, false, "") \ M(SettingInt64, enable_version_chain, 1, "Enable version chain or not: 0 - disable, 1 - enabled. " \ "More details are in the comments of `enum class VersionChainMode`." \ "Modifying this configuration requires a restart to reset the in-memory state.") \ diff --git a/dbms/src/Operators/ConcatSourceOp.h b/dbms/src/Operators/ConcatSourceOp.h index a816abda0c6..6b36c326d0e 100644 --- a/dbms/src/Operators/ConcatSourceOp.h +++ b/dbms/src/Operators/ConcatSourceOp.h @@ -178,10 +178,173 @@ class ConcatSourceOp : public SourceOp bool done = false; }; +class UnorderedConcatSourceOp : public SourceOp +{ +private: + struct PipelineExecWithStatus + { + PipelineExecPtr exec; + bool is_finished = false; + }; + +public: + UnorderedConcatSourceOp( + PipelineExecutorContext & exec_context_, + const String & req_id, + std::vector & exec_builder_pool) + : SourceOp(exec_context_, req_id) + , cur_exec_index(0) + { + RUNTIME_CHECK(!exec_builder_pool.empty()); + setHeader(exec_builder_pool.back().getCurrentHeader()); + for (auto & exec_builder : exec_builder_pool) + { + exec_builder.setSinkOp(std::make_unique(exec_context_, req_id, res)); + exec_pool.push_back(PipelineExecWithStatus{.exec = exec_builder.build(false), .is_finished = false}); + } + } + + String getName() const override { return "UnorderedConcatSourceOp"; } + + // UnorderedConcatSourceOp is used to merge multiple partitioned tables of storage layer, so override `getIOProfileInfo` is needed here. + IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); } + +protected: + void operatePrefixImpl() override + { + if (exec_pool.empty()) + { + done = true; + return; + } + + for (auto & exec_with_status : exec_pool) + { + exec_with_status.exec->executePrefix(); + } + } + + void operateSuffixImpl() override + { + for (auto & exec_with_status : exec_pool) + { + if (!exec_with_status.is_finished) + { + exec_with_status.exec->executeSuffix(); + exec_with_status.exec.reset(); + exec_with_status.is_finished = true; + } + } + exec_pool.clear(); + } + + OperatorStatus readImpl(Block & block) override + { + if unlikely (done) + return OperatorStatus::HAS_OUTPUT; + + if unlikely (res) + { + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + } + + std::optional beg_idx = std::nullopt; + while (true) + { + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->execute(); + switch (status) + { + case OperatorStatus::WAIT_FOR_NOTIFY: + if (beg_idx.has_value() && beg_idx.value() == cur_exec_index) + { + // all exec are waiting + return OperatorStatus::WAIT_FOR_NOTIFY; + } + if (beg_idx == std::nullopt) + beg_idx = cur_exec_index; + // try to read from next exec + tryNextExec(); + break; + case OperatorStatus::NEED_INPUT: + assert(res); + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + case OperatorStatus::FINISHED: + exec_pool[cur_exec_index].exec->executeSuffix(); + exec_pool[cur_exec_index].exec.reset(); + exec_pool[cur_exec_index].is_finished = true; + if (!tryNextExec()) + { + done = true; + return OperatorStatus::HAS_OUTPUT; + } + break; + default: + return status; + } + } + } + + bool tryNextExec() + { + size_t beg_idx = cur_exec_index; + while (true) + { + cur_exec_index = (cur_exec_index + 1) % exec_pool.size(); + if (!exec_pool[cur_exec_index].is_finished) + return true; + if (cur_exec_index == beg_idx) + { + // all exec finished + done = true; + return false; + } + } + } + + OperatorStatus executeIOImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->executeIO(); + assert(status != OperatorStatus::FINISHED); + return status; + } + + OperatorStatus awaitImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->await(); + assert(status != OperatorStatus::FINISHED); + return status; + } + + void notifyImpl() override + { + assert(exec_pool[cur_exec_index].exec); + exec_pool[cur_exec_index].exec->notify(); + } + +private: + std::vector exec_pool; + size_t cur_exec_index; + + Block res; + bool done = false; +}; + class ConcatBuilderPool { public: - explicit ConcatBuilderPool(size_t expect_size) + explicit ConcatBuilderPool(size_t expect_size, bool use_unordered_concat_ = false) + : use_unordered_concat(use_unordered_concat_) { RUNTIME_CHECK(expect_size > 0); pool.resize(expect_size); @@ -204,6 +367,15 @@ class ConcatBuilderPool const String & req_id) { RUNTIME_CHECK(result_builder.empty()); + if (use_unordered_concat) + { + LOG_INFO(Logger::get(req_id), "Using UnorderedConcatSourceOp to concat source ops"); + } + else + { + LOG_INFO(Logger::get(req_id), "Using ConcatSourceOp to concat source ops"); + } + for (auto & builders : pool) { if (builders.empty()) @@ -216,7 +388,15 @@ class ConcatBuilderPool } else { - result_builder.addConcurrency(std::make_unique(exec_context, req_id, builders)); + if (use_unordered_concat) + { + result_builder.addConcurrency( + std::make_unique(exec_context, req_id, builders)); + } + else + { + result_builder.addConcurrency(std::make_unique(exec_context, req_id, builders)); + } } } } @@ -224,5 +404,6 @@ class ConcatBuilderPool private: std::vector> pool; size_t pre_index = 0; + bool use_unordered_concat; }; } // namespace DB diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index 5090da3dbbf..dad2d324517 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB { @@ -49,6 +50,18 @@ UnorderedSourceOp::UnorderedSourceOp( } } +UnorderedSourceOp::~UnorderedSourceOp() +{ + if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1) + { + LOG_INFO( + log, + "All unordered input streams are finished, pool_id={} last_stream_ref_no={}", + task_pool->pool_id, + ref_no); + } +} + OperatorStatus UnorderedSourceOp::readImpl(Block & block) { if unlikely (done) @@ -74,6 +87,7 @@ OperatorStatus UnorderedSourceOp::readImpl(Block & block) else { done = true; + // return HAS_OUTPUT with empty block to indicate end of stream return OperatorStatus::HAS_OUTPUT; } } diff --git a/dbms/src/Operators/UnorderedSourceOp.h b/dbms/src/Operators/UnorderedSourceOp.h index 385ff0bf8d6..70b479bf060 100644 --- a/dbms/src/Operators/UnorderedSourceOp.h +++ b/dbms/src/Operators/UnorderedSourceOp.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include namespace DB { @@ -37,17 +37,7 @@ class UnorderedSourceOp : public SourceOp int max_wait_time_ms_ = 0, bool is_disagg_ = false); - ~UnorderedSourceOp() override - { - if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1) - { - LOG_INFO( - log, - "All unordered input streams are finished, pool_id={} last_stream_ref_no={}", - task_pool->pool_id, - ref_no); - } - } + ~UnorderedSourceOp() override; String getName() const override { return "UnorderedSourceOp"; } diff --git a/dbms/src/Operators/tests/gtest_concat_source.cpp b/dbms/src/Operators/tests/gtest_concat_source.cpp index dd741870dd6..7f0d577b34c 100644 --- a/dbms/src/Operators/tests/gtest_concat_source.cpp +++ b/dbms/src/Operators/tests/gtest_concat_source.cpp @@ -179,6 +179,84 @@ class SimpleGetResultSinkOp : public SinkOp ResultHandler result_handler; }; +class MockReadTaskPool; +using MockReadTaskPoolPtr = std::shared_ptr; + +class MockReadTaskPool : public NotifyFuture +{ +public: + explicit MockReadTaskPool(size_t idx_, size_t slot_limit) + : idx(idx_) + , q(slot_limit) + {} + + void pushBlock(Block && block) { q.push(std::move(block), nullptr); } + void finish() { q.finish(); } + + bool tryPopBlock(Block & block) { return q.tryPop(block); } + + void registerTask(TaskPtr && task) override + { + q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); + } + + const size_t idx; + +private: + DB::DM::WorkQueue q; +}; + +class MockSourceOnReadTaskPoolOp : public SourceOp +{ +public: + MockSourceOnReadTaskPoolOp(PipelineExecutorContext & exec_context_, Block header_, const MockReadTaskPoolPtr & pool) + : SourceOp(exec_context_, "mock") + , done(false) + , task_pool(pool) + { + setHeader(header_); + } + + String getName() const override { return "MockSourceOp"; } + +protected: + OperatorStatus readImpl(Block & block) override + { + if (done) + return OperatorStatus::HAS_OUTPUT; + + while (true) + { + if (!task_pool->tryPopBlock(block)) + { + setNotifyFuture(task_pool.get()); + return OperatorStatus::WAIT_FOR_NOTIFY; + } + + if (block) + { + if (block.rows() == 0) + { + block.clear(); + continue; + } + LOG_INFO(Logger::get(), "A block is popped from pool_idx={}", task_pool->idx); + return OperatorStatus::HAS_OUTPUT; + } + else + { + done = true; + LOG_INFO(Logger::get(), "No more blocks in pool_idx={}", task_pool->idx); + // return HAS_OUTPUT with empty block to indicate end of stream + return OperatorStatus::HAS_OUTPUT; + } + } + } + +private: + bool done; + MockReadTaskPoolPtr task_pool; +}; } // namespace TEST_F(TestConcatSource, ConcatBuilderPoolWithDifferentConcurrency) @@ -255,4 +333,79 @@ try } CATCH +TEST_F(TestConcatSource, unorderedConcatSink) +try +{ + LoggerPtr log = Logger::get(); + + Blocks blks{ + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + }; + + Block header = blks[0].cloneEmpty(); + + std::vector pools; + + PipelineExecutorContext exec_context; + size_t num_concurrency = 1; + ConcatBuilderPool builder_pool{num_concurrency}; + // Mock that for each partition (physical table), there is a read task pool and multiple source ops reading from it. + size_t num_partitions = 3; + for (size_t idx_part = 0; idx_part < num_partitions; ++idx_part) + { + PipelineExecGroupBuilder group_builder; + // MockReadTaskPoolPtr pool = std::make_shared(std::ceil(num_concurrency * 1.5)); + MockReadTaskPoolPtr pool = std::make_shared(idx_part, 0); + pools.push_back(pool); + for (size_t i = 0; i < num_concurrency; ++i) + { + group_builder.addConcurrency(std::make_unique(exec_context, header, pool)); + } + builder_pool.add(group_builder); + } + + auto thread_func = [](const MockReadTaskPoolPtr & pool, const Blocks & blks, const LoggerPtr & log) { + for (size_t i = 0; i < blks.size(); ++i) + { + Block blk = blks[i]; + pool->pushBlock(std::move(blk)); + LOG_INFO(log, "A block is pushed to pool, pool_idx={} n_pushed={}", pool->idx, i + 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + pool->finish(); + LOG_INFO(log, "All blocks are pushed to pool_idx={} n_pushed={}", pool->idx, blks.size()); + }; + + auto thread_mgr = DB::newThreadManager(); + for (const auto & pool : pools) + { + thread_mgr->schedule(false, "", [pool, &blks, &log, &thread_func]() { thread_func(pool, blks, log); }); + } + + std::atomic received_blocks = 0; + ResultHandler h([&](const Block & /*block*/) { received_blocks.fetch_add(1, std::memory_order_relaxed); }); + + PipelineExecGroupBuilder result_builder; + builder_pool.generate(result_builder, exec_context, "test"); + result_builder.transform( + [&](auto & builder) { builder.setSinkOp(std::make_unique(exec_context, "test", h)); }); + auto op_pipeline_grp = result_builder.build(false); + op_pipeline_grp[0]->executePrefix(); + while (true) + { + auto s = op_pipeline_grp[0]->execute(); + if (s == OperatorStatus::FINISHED) + { + LOG_INFO(log, "ConcatPipelineExec is finished"); + break; + } + } + op_pipeline_grp[0]->executeSuffix(); + LOG_INFO(log, "ConcatPipelineExec is built and executed"); +} +CATCH } // namespace DB::tests diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a695f82b607..c6716c0b643 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1024,6 +1024,7 @@ BlockInputStreams DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1073,6 +1074,7 @@ BlockInputStreams DeltaMergeStore::readRaw( if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) req_info = db_context.getDAGContext()->getMPPTaskId().toString(); auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, columns_to_read, EMPTY_FILTER, @@ -1085,6 +1087,7 @@ BlockInputStreams DeltaMergeStore::readRaw( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); BlockInputStreams res; @@ -1128,6 +1131,7 @@ void DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1178,6 +1182,7 @@ void DeltaMergeStore::readRaw( if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) req_info = db_context.getDAGContext()->getMPPTaskId().toString(); auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, columns_to_read, EMPTY_FILTER, @@ -1190,6 +1195,7 @@ void DeltaMergeStore::readRaw( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); if (enable_read_thread) @@ -1270,6 +1276,7 @@ BlockInputStreams DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -1310,6 +1317,7 @@ BlockInputStreams DeltaMergeStore::read( const auto & final_columns_to_read = executor && executor->extra_cast ? *executor->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, final_columns_to_read, executor, @@ -1322,6 +1330,7 @@ BlockInputStreams DeltaMergeStore::read( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); dm_context->scan_context->read_mode = read_mode; @@ -1381,6 +1390,7 @@ void DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -1437,6 +1447,7 @@ void DeltaMergeStore::read( const auto & final_columns_to_read = executor && executor->extra_cast ? *executor->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, final_columns_to_read, executor, @@ -1449,6 +1460,7 @@ void DeltaMergeStore::read( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); dm_context->scan_context->read_mode = read_mode; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 89dba189c90..b8007a568ca 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -355,6 +356,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -367,6 +369,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -381,6 +384,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -405,6 +409,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 91f18c8836a..381f30f0b95 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -565,12 +565,15 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( size_t read_rows) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); + + // If reaching `max_sharing_column_bytes`, do not fill data_sharing_col_data_cache anymore. bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr && std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes); if (reach_sharing_column_memory_limit) { GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); } + if (has_concurrent_reader && !reach_sharing_column_memory_limit) { auto column = getColumnFromCache( @@ -599,6 +602,7 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( return column; } + // Directly read from disk ignoring the data_sharing_col_data_cache. return readFromDisk(cd, type_on_disk, start_pack_id, read_rows); } diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h index 472eca8f02b..ee411df41d2 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h @@ -200,11 +200,13 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils void read(const RowKeyRange & range, const PushDownExecutorPtr & executor, const ColumnWithTypeAndName & out) { + auto read_queue = std::make_shared(2, Logger::get("read")); auto in = store->read( *db_context, db_context->getSettingsRef(), {cdVec()}, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), executor, @@ -212,6 +214,7 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils 0, TRACING_NAME, DMReadOptions{})[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({vec_column_name}), diff --git a/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp index 4aec61395ce..aceeb8ba004 100644 --- a/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp @@ -153,11 +153,13 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; + auto read_queue = std::make_shared(2, Logger::get("read")); auto streams = store->read( context, context.getSettingsRef(), {col_to_read}, {all_range}, + read_queue, 1, std::numeric_limits::max(), std::make_shared(filter), @@ -165,6 +167,7 @@ bool checkMatch( 0, name, DMReadOptions{}); + read_queue->finishQueueIfEmpty(); auto rows = getInputStreamNRows(streams[0]); store->drop(); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h new file mode 100644 index 00000000000..b6366458487 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h @@ -0,0 +1,310 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::DM +{ + +struct TableTaskStat +{ + size_t num_read_tasks = 0; + size_t num_active_segs = 0; + size_t num_finished_segs = 0; +}; +} // namespace DB::DM + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::DM::TableTaskStat & s, FormatContext & ctx) const + { + return fmt::format_to( + ctx.out(), + "{{tot:{} act:{} fin:{}}}", + s.num_read_tasks, + s.num_active_segs, + s.num_finished_segs); + } +}; + +namespace DB::DM +{ + +// Statistics of blocks pushed. +// All methods are thread-safe. +class BlockStat +{ +public: + BlockStat() + : pending_count(0) + , pending_bytes(0) + , total_count(0) + , total_bytes(0) + {} + + void push(const Block & blk) + { + pending_count.fetch_add(1, std::memory_order_relaxed); + total_count.fetch_add(1, std::memory_order_relaxed); + + auto b = blk.bytes(); + pending_bytes.fetch_add(b, std::memory_order_relaxed); + total_bytes.fetch_add(b, std::memory_order_relaxed); + + total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); + } + + void pop(const Block & blk) + { + if (likely(blk)) + { + pending_count.fetch_sub(1, std::memory_order_relaxed); + pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); + } + } + + Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } + + Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } + + Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } + + Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } + + Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } + +private: + std::atomic pending_count; + std::atomic pending_bytes; + std::atomic total_count; + std::atomic total_bytes; + std::atomic total_rows; +}; + +// ActiveSegmentReadTaskQueue manages the blocks read from active segments. +// If the instance is shared among multiple SegmentReadTaskPools, the instance +// limits the total number of active segments and the total number of pending blocks +// among all SegmentReadTaskPools. a.k.a among all physical tables. +class ActiveSegmentReadTaskQueue +{ +public: + explicit ActiveSegmentReadTaskQueue(size_t max_streams, LoggerPtr log_) + : // If the queue is too short, only 1 in the extreme case, it may cause the computation thread + // to encounter empty queues frequently, resulting in too much waiting and thread context switching. + // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, + // when `num_streams_` is 1, `block_slot_limit` is at least 2. + block_slot_limit(std::ceil(std::max(1, max_streams) * 1.5)) + // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, + // situations where the computation may be faster and the storage layer may not be able to keep up. + , active_segment_limit(std::max(2, max_streams)) + , log(std::move(log_)) + {} + + ~ActiveSegmentReadTaskQueue() + { + auto [pop_times, pop_empty_times, peak_blocks_in_queue] = q.getStat(); + auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; + auto total_count = blk_stat.totalCount(); + auto total_bytes = blk_stat.totalBytes(); + auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; + auto approx_max_pending_block_bytes = blk_avg_bytes * peak_blocks_in_queue; + auto total_rows = blk_stat.totalRows(); + LOG_INFO( + log, + "ActiveSegmentReadTaskQueue finished. pop={} pop_empty={} pop_empty_ratio={:.3f} " + "active_segment_limit={} peak_active_segments={} " + "block_slot_limit={} peak_blocks_in_queue={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " + "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", + pop_times, + pop_empty_times, + pop_empty_ratio, + active_segment_limit, + peak_active_segments, + block_slot_limit, + peak_blocks_in_queue, + blk_avg_bytes, + approx_max_pending_block_bytes / 1024.0 / 1024.0, + total_count, + total_bytes / 1024.0 / 1024.0, + total_rows, + total_count > 0 ? total_rows / total_count : 0, + total_rows > 0 ? total_bytes / total_rows : 0); + } + + // === table task management === + + // Add a table task. + void addTableTask(TableID table_id, size_t num_read_tasks) + { + std::unique_lock lock(mu); + tables[table_id] = TableTaskStat{ + .num_read_tasks = num_read_tasks, + }; + } + + // Remove a table task without finishing the queue. + void resetTableTask(TableID table_id) + { + std::unique_lock lock(mu); + tables.erase(table_id); + } + + // Remove a table task. If there is no any table task, finish the queue. + void removeTableTask(TableID table_id, UInt64 pool_id) + { + String table_id_remain; + size_t remain_count = 0; + { + std::unique_lock lock(mu); + tables.erase(table_id); + remain_count = tables.size(); + table_id_remain = remain_count > 0 ? fmt::format("{}", tables) : "none"; + } + + if (remain_count == 0) + { + q.finish(); + } + + LOG_INFO( + log, + "all segments are finished, table_id={} pool_id={} remain_count={} remain_table_ids={}", + table_id, + pool_id, + remain_count, + table_id_remain); + } + + // Finish the queue if there is no any table task. + // Must be called after all table tasks are added. + void finishQueueIfEmpty() + { + String table_id_remain; + { + std::unique_lock lock(mu); + if (tables.empty()) + { + q.finish(); + } + table_id_remain = tables.empty() ? "none" : fmt::format("{}", tables); + } + LOG_INFO(log, "finishQueueIfEmpty called, remain_table_ids={}", table_id_remain); + } + + // Finish the queue unconditionally. Used when aborting when exception happens. + void finishQueue() { q.finish(); } + + // === scheduling limit helpers === + + Int64 getFreeBlockSlots() const { return block_slot_limit - blk_stat.pendingCount(); } + Int64 getFreeActiveSegments() const + { + std::unique_lock lock(mu); + return active_segment_limit - static_cast(active_segment_ids.size()); + } + + void addActiveSegment(TableID table_id, const GlobalSegmentID & seg_id) + { + std::unique_lock lock(mu); + active_segment_ids.emplace(seg_id); + peak_active_segments = std::max(peak_active_segments, active_segment_ids.size()); + // increase the counter of active segments for the table_id + auto iter = tables.find(table_id); + RUNTIME_CHECK_MSG( + iter != tables.end(), + "table_id not found from ActiveSegmentReadTaskQueue, table_id={} tables={}", + table_id, + tables); + iter->second.num_active_segs++; + } + + size_t finishActiveSegment(TableID table_id, const GlobalSegmentID & seg_id) + { + std::unique_lock lock(mu); + active_segment_ids.erase(seg_id); + auto iter = tables.find(table_id); + RUNTIME_CHECK_MSG( + iter != tables.end(), + "table_id not found from ActiveSegmentReadTaskQueue, table_id={} tables={}", + table_id, + tables); + RUNTIME_CHECK_MSG( + iter->second.num_active_segs >= 1, + "table_id has invalid active segments number, table_id={} table_stat={} tables={}", + table_id, + iter->second, + tables); + // decrease the counter of active segments for the table_id + --iter->second.num_active_segs; + ++iter->second.num_finished_segs; + return iter->second.num_active_segs; + } + + // === queue operations === + + void pushBlock(Block && block) + { + blk_stat.push(block); + q.push(std::move(block), nullptr); + } + + // Blocking pop + void popBlock(Block & block) + { + q.pop(block); + blk_stat.pop(block); + } + + // Non-blocking pop + bool tryPopBlock(Block & block) + { + if (q.tryPop(block)) + { + blk_stat.pop(block); + return true; + } + else + { + return false; + } + } + + void registerTask(TaskPtr && task) { q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); } + +private: + WorkQueue q; + + const Int64 block_slot_limit; + const Int64 active_segment_limit; + // Statistics of blocks pushed into the queue. + BlockStat blk_stat; + + LoggerPtr log; + + mutable std::mutex mu; + + std::unordered_map tables; + size_t peak_active_segments = 0; + std::unordered_set active_segment_ids; +}; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index e482a541024..9fe2d592db1 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -62,7 +62,12 @@ void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool) assert(pool != nullptr); if (pool->getPendingSegmentCount() <= 0) { - LOG_INFO(pool->getLogger(), "Ignored for no segment to read, pool_id={}", pool->pool_id); + LOG_INFO( + pool->getLogger(), + "Ignored for no segment to read, keyspace={} table_id={} pool_id={}", + pool->keyspace_id, + pool->table_id, + pool->pool_id); return; } Stopwatch sw; @@ -70,7 +75,9 @@ void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool) pending_pools.push_back(pool); LOG_INFO( pool->getLogger(), - "Submitted, pool_id={} segment_count={} pending_pools={} cost={}ns", + "Submitted, keyspace={} table_id={} pool_id={} segment_count={} pending_pools={} cost={}ns", + pool->keyspace_id, + pool->table_id, pool->pool_id, pool->getPendingSegmentCount(), pending_pools.size(), @@ -244,7 +251,12 @@ std::tuple SegmentReadTaskScheduler::scheduleOneRound() // TODO: `weak_ptr` may be more suitable. if (pool.use_count() == 1) { - LOG_INFO(pool->getLogger(), "Erase pool_id={}", pool->pool_id); + LOG_INFO( + pool->getLogger(), + "Erase keyspace={} table_id={} pool_id={}", + pool->keyspace_id, + pool->table_id, + pool->pool_id); ++erased_pool_count; itr = read_pools.erase(itr); continue; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index f51615698a7..361230d4680 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -33,7 +33,7 @@ class SegmentReadTasksPoolTest; // // - `sched_thread` will scheduling read tasks. // - Call path: schedLoop -> schedule -> reapPendingPools -> scheduleOneRound -// - reapPeningPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`. +// - reapPendingPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`. // - scheduleOneRound will scan `read_pools` and choose segments to read. class SegmentReadTaskScheduler { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp index 81329592f13..46d720a2bc9 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp @@ -92,8 +92,11 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic SegmentReadTaskPoolPtr createSegmentReadTaskPool(const std::vector & seg_ids) { + size_t num_streams = 1; + auto read_queue = std::make_shared(num_streams); auto dm_context = createDMContext(); return std::make_shared( + read_queue, /*extra_table_id_index_*/ dm_context->physical_table_id, /*columns_to_read_*/ ColumnDefines{}, /*filter_*/ nullptr, @@ -104,8 +107,9 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic /*after_segment_read_*/ [&](const DMContextPtr &, const SegmentPtr &) { /*do nothing*/ }, /*tracing_id_*/ String{}, /*enable_read_thread_*/ true, - /*num_streams_*/ 1, + /*num_streams_*/ num_streams, /*keyspace_id_*/ NullspaceID, + dm_context->physical_table_id, /*res_group_name_*/ String{}); } @@ -122,7 +126,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic // Submit to pending_pools scheduler.add(pool); { - std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings ASSERT_EQ(scheduler.pending_pools.size(), 1); } ASSERT_EQ(scheduler.read_pools.size(), 0); @@ -130,7 +134,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic // Reap the pending_pools scheduler.reapPendingPools(); { - std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings ASSERT_EQ(scheduler.pending_pools.size(), 0); } ASSERT_EQ(scheduler.read_pools.size(), 1); @@ -195,9 +199,12 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic pool->finishSegment(merged_task->units.front().task); } - ASSERT_EQ(pool->q.size(), 0); Block blk; - ASSERT_FALSE(pool->q.pop(blk)); + // popBlock can only return empty block + pool->shared_q->popBlock(blk); + ASSERT_FALSE(blk); + pool->shared_q->tryPopBlock(blk); + ASSERT_FALSE(blk); pool->decreaseUnorderedInputStreamRefCount(); ASSERT_FALSE(pool->valid()); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 481c4f647df..3bc97af51ef 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -83,6 +83,11 @@ bool SegmentReadTasksWrapper::empty() const return ordered_tasks.empty() && unordered_tasks.empty(); } +size_t SegmentReadTasksWrapper::size() const +{ + return ordered_tasks.size() + unordered_tasks.size(); +} + BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { MemoryTrackerSetter setter(true, mem_tracker.get()); @@ -103,14 +108,17 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t t->dm_context->physical_table_id); LOG_DEBUG( log, - "buildInputStream: read_mode={}, pool_id={} segment={}", + "buildInputStream: read_mode={} keyspace={} table_id={} pool_id={} segment={}", magic_enum::enum_name(read_mode), + keyspace_id, + table_id, pool_id, t); return stream; } SegmentReadTaskPool::SegmentReadTaskPool( + const ActiveSegmentReadTaskQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -121,10 +129,13 @@ SegmentReadTaskPool::SegmentReadTaskPool( AfterSegmentRead after_segment_read_, const String & tracing_id, bool enable_read_thread_, - Int64 num_streams_, - const KeyspaceID & keyspace_id_, + Int64 /*num_streams_*/, + KeyspaceID keyspace_id_, + TableID table_id_, const String & res_group_name_) - : pool_id(nextPoolId()) + : keyspace_id(keyspace_id_) + , table_id(table_id_) + , pool_id(nextPoolId()) , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) , extra_table_id_index(extra_table_id_index_) , columns_to_read(columns_to_read_) @@ -135,73 +146,58 @@ SegmentReadTaskPool::SegmentReadTaskPool( , total_read_tasks(tasks_.size()) , tasks_wrapper(enable_read_thread_, std::move(tasks_)) , after_segment_read(after_segment_read_) - , peak_active_segments(0) + , shared_q(shared_q_) , log(Logger::get(tracing_id)) , unordered_input_stream_ref_count(0) , exception_happened(false) - // If the queue is too short, only 1 in the extreme case, it may cause the computation thread - // to encounter empty queues frequently, resulting in too much waiting and thread context switching. - // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, - // when `num_streams_` is 1, `block_slot_limit` is at least 2. - , block_slot_limit(std::ceil(num_streams_ * 1.5)) - // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, - // situations where the computation may be faster and the storage layer may not be able to keep up. - , active_segment_limit(std::max(num_streams_, 2)) - , keyspace_id(keyspace_id_) , res_group_name(res_group_name_) { - if (tasks_wrapper.empty()) + RUNTIME_CHECK_MSG( + shared_q != nullptr, + "SegmentReadTaskPool requires a valid ActiveSegmentReadTaskQueuePtr, tracing_id={}", + tracing_id); + + if (!tasks_wrapper.empty()) { - q.finish(); + shared_q->addTableTask(table_id, total_read_tasks); } } SegmentReadTaskPool::~SegmentReadTaskPool() { - auto [pop_times, pop_empty_times, peak_blocks_in_queue] = q.getStat(); - auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; - auto total_count = blk_stat.totalCount(); - auto total_bytes = blk_stat.totalBytes(); - auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; - auto approx_max_pending_block_bytes = blk_avg_bytes * peak_blocks_in_queue; - auto total_rows = blk_stat.totalRows(); - LOG_INFO( - log, - "Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={:.3f} " - "active_segment_limit={} peak_active_segments={} " - "block_slot_limit={} peak_blocks_in_queue={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " - "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", - pool_id, - pop_times, - pop_empty_times, - pop_empty_ratio, - active_segment_limit, - peak_active_segments, - block_slot_limit, - peak_blocks_in_queue, - blk_avg_bytes, - approx_max_pending_block_bytes / 1024.0 / 1024.0, - total_count, - total_bytes / 1024.0 / 1024.0, - total_rows, - total_count > 0 ? total_rows / total_count : 0, - total_rows > 0 ? total_bytes / total_rows : 0); + LOG_DEBUG(log, "SegmentReadTaskPool finished. keyspace={} table_id={} pool_id={}", keyspace_id, table_id, pool_id); } void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg) { after_segment_read(seg->dm_context, seg->segment); bool pool_finished = false; + size_t num_act_segs = shared_q->finishActiveSegment(table_id, seg->getGlobalSegmentID()); + size_t num_task_left = 0; { std::lock_guard lock(mutex); - active_segment_ids.erase(seg->getGlobalSegmentID()); - pool_finished = active_segment_ids.empty() && tasks_wrapper.empty(); + num_task_left = tasks_wrapper.size(); + pool_finished = num_act_segs == 0 && tasks_wrapper.empty(); } - LOG_DEBUG(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished); + LOG_DEBUG( + log, + "finishSegment pool_id={} segment={} num_act_segs={} num_task_left={} pool_finished={}", + pool_id, + seg, + num_act_segs, + num_task_left, + pool_finished); + if (pool_finished) { - q.finish(); - LOG_INFO(log, "pool_id={} finished", pool_id); + // notify the queue this table_id is finished only when all segments are finished. + shared_q->removeTableTask(table_id, pool_id); + LOG_DEBUG( + log, + "SegmentReadTaskPool all segments are finished, keyspace={} table_id={} pool_id={}", + keyspace_id, + table_id, + pool_id); } } @@ -213,11 +209,13 @@ SegmentReadTaskPtr SegmentReadTaskPool::nextTask() SegmentReadTaskPtr SegmentReadTaskPool::getTask(const GlobalSegmentID & seg_id) { - std::lock_guard lock(mutex); - auto t = tasks_wrapper.getTask(seg_id); - RUNTIME_CHECK(t != nullptr, pool_id, seg_id); - active_segment_ids.insert(seg_id); - peak_active_segments = std::max(peak_active_segments, active_segment_ids.size()); + SegmentReadTaskPtr t; + { + std::lock_guard lock(mutex); + t = tasks_wrapper.getTask(seg_id); + RUNTIME_CHECK(t != nullptr, pool_id, seg_id); + } + shared_q->addActiveSegment(table_id, seg_id); return t; } @@ -260,11 +258,12 @@ MergingSegments::iterator SegmentReadTaskPool::scheduleSegment( bool enable_data_sharing) { auto target = segments.end(); - std::lock_guard lock(mutex); - if (getFreeActiveSegmentsUnlock() <= 0) + if (getFreeActiveSegments() <= 0) { return target; } + + std::lock_guard lock(mutex); static constexpr int max_iter_count = 32; int iter_count = 0; const auto & tasks = tasks_wrapper.getTasks(); @@ -309,8 +308,7 @@ bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const Segme void SegmentReadTaskPool::popBlock(Block & block) { - q.pop(block); - blk_stat.pop(block); + shared_q->popBlock(block); global_blk_stat.pop(block); if (exceptionHappened()) { @@ -320,9 +318,8 @@ void SegmentReadTaskPool::popBlock(Block & block) bool SegmentReadTaskPool::tryPopBlock(Block & block) { - if (q.tryPop(block)) + if (shared_q->tryPopBlock(block)) { - blk_stat.pop(block); global_blk_stat.pop(block); if (exceptionHappened()) throw exception; @@ -336,12 +333,11 @@ bool SegmentReadTaskPool::tryPopBlock(Block & block) void SegmentReadTaskPool::pushBlock(Block && block) { - blk_stat.push(block); global_blk_stat.push(block); auto bytes = block.bytes(); read_bytes_after_last_check += bytes; GET_METRIC(tiflash_storage_read_thread_counter, type_push_block_bytes).Increment(bytes); - q.push(std::move(block), nullptr); + shared_q->pushBlock(std::move(block)); } Int64 SegmentReadTaskPool::increaseUnorderedInputStreamRefCount() @@ -355,18 +351,12 @@ Int64 SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount() Int64 SegmentReadTaskPool::getFreeBlockSlots() const { - return block_slot_limit - blk_stat.pendingCount(); + return shared_q->getFreeBlockSlots(); } Int64 SegmentReadTaskPool::getFreeActiveSegments() const { - std::lock_guard lock(mutex); - return getFreeActiveSegmentsUnlock(); -} - -Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const -{ - return active_segment_limit - static_cast(active_segment_ids.size()); + return shared_q->getFreeActiveSegments(); } Int64 SegmentReadTaskPool::getPendingSegmentCount() const @@ -391,7 +381,7 @@ void SegmentReadTaskPool::setException(const DB::Exception & e) { exception = e; exception_happened.store(true, std::memory_order_relaxed); - q.finish(); + shared_q->finishQueue(); } } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 36e8d5cab20..9945931011f 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -21,8 +21,10 @@ #include #include #include +#include #include #include +#include namespace DB::DM { @@ -30,56 +32,6 @@ namespace tests { class SegmentReadTasksPoolTest; } -using AfterSegmentRead = std::function; - -class BlockStat -{ -public: - BlockStat() - : pending_count(0) - , pending_bytes(0) - , total_count(0) - , total_bytes(0) - {} - - void push(const Block & blk) - { - pending_count.fetch_add(1, std::memory_order_relaxed); - total_count.fetch_add(1, std::memory_order_relaxed); - - auto b = blk.bytes(); - pending_bytes.fetch_add(b, std::memory_order_relaxed); - total_bytes.fetch_add(b, std::memory_order_relaxed); - - total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); - } - - void pop(const Block & blk) - { - if (likely(blk)) - { - pending_count.fetch_sub(1, std::memory_order_relaxed); - pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); - } - } - - Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } - - Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } - - Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } - - Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } - - Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } - -private: - std::atomic pending_count; - std::atomic pending_bytes; - std::atomic total_count; - std::atomic total_bytes; - std::atomic total_rows; -}; // If `enable_read_thread_` is true, `SegmentReadTasksWrapper` use `std::unordered_map` to index `SegmentReadTask` by segment id, // else it is the same as `SegmentReadTasks`, a `std::list` of `SegmentReadTask`. @@ -98,18 +50,22 @@ class SegmentReadTasksWrapper bool empty() const; + size_t size() const; + private: bool enable_read_thread; SegmentReadTasks ordered_tasks; std::unordered_map unordered_tasks; }; +// The SegmentReadTaskPool manages the read tasks for a query on a physical table. class SegmentReadTaskPool : public NotifyFuture , private boost::noncopyable { public: SegmentReadTaskPool( + const ActiveSegmentReadTaskQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -121,7 +77,8 @@ class SegmentReadTaskPool const String & tracing_id, bool enable_read_thread_, Int64 num_streams_, - const KeyspaceID & keyspace_id_, + KeyspaceID keyspace_id_, + TableID table_id_, const String & res_group_name_); ~SegmentReadTaskPool() override; @@ -151,10 +108,7 @@ class SegmentReadTaskPool std::once_flag & addToSchedulerFlag() { return add_to_scheduler; } - void registerTask(TaskPtr && task) override - { - q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); - } + void registerTask(TaskPtr && task) override { shared_q->registerTask(std::move(task)); } std::once_flag & getRemoteConnectionInfoFlag() { return get_remote_connection_flag; } std::optional> getRemoteConnectionInfo() const; @@ -169,7 +123,9 @@ class SegmentReadTaskPool size_t getTotalReadTasks() const { return total_read_tasks; } public: - const uint64_t pool_id; + const KeyspaceID keyspace_id; + const TableID table_id; + const UInt64 pool_id; // The memory tracker of MPPTask. const MemoryTrackerPtr mem_tracker; @@ -196,7 +152,6 @@ class SegmentReadTaskPool const LoggerPtr & getLogger() const { return log; } private: - Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; void finishSegment(const SegmentReadTaskPtr & seg); void pushBlock(Block && block); @@ -213,10 +168,8 @@ class SegmentReadTaskPool SegmentReadTasksWrapper tasks_wrapper; AfterSegmentRead after_segment_read; mutable std::mutex mutex; - size_t peak_active_segments; - std::unordered_set active_segment_ids; - WorkQueue q; - BlockStat blk_stat; + // std::unordered_set active_segment_ids; + ActiveSegmentReadTaskQueuePtr shared_q; LoggerPtr log; std::atomic unordered_input_stream_ref_count; @@ -224,16 +177,12 @@ class SegmentReadTaskPool std::atomic exception_happened; DB::Exception exception; - // SegmentReadTaskPool will be held by several UnorderedBlockInputStreams. - // It will be added to SegmentReadTaskScheduler when one of the UnorderedBlockInputStreams being read. - // Since several UnorderedBlockInputStreams can be read by several threads concurrently, we use + // SegmentReadTaskPool will be held by several `UnorderedInputStream`s/`UnorderedSourceOp`s. + // It will be added to SegmentReadTaskScheduler when one of the UnorderedInputStream/UnorderedSourceOp being read. + // Since several UnorderedInputStream/UnorderedSourceOp can be read by several threads concurrently, we use // std::once_flag and std::call_once to prevent duplicated add. std::once_flag add_to_scheduler; - const Int64 block_slot_limit; - const Int64 active_segment_limit; - - const KeyspaceID keyspace_id; const String res_group_name; std::mutex ru_mu; std::atomic last_time_check_ru = 0; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h new file mode 100644 index 00000000000..13c4b3d994f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h @@ -0,0 +1,28 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB::DM +{ +class SegmentReadTaskPool; +using SegmentReadTaskPoolPtr = std::shared_ptr; + +using AfterSegmentRead = std::function; +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 3736c8dfdc0..defeb5466bb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -254,11 +254,13 @@ class DeltaMergeStoreTestFastAddPeer void verifyRows(const RowKeyRange & range, size_t rows) { const auto & columns = store->getTableColumns(); + auto read_queue = std::make_shared(2, Logger::get("verifyRows")); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -267,6 +269,7 @@ class DeltaMergeStoreTestFastAddPeer TRACING_NAME, DMReadOptions{}, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, rows); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp index de96a94dc20..7ef93f5915c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp @@ -73,12 +73,14 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithoutRangeFilter) { // read all columns from store with all range in fast mode + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -89,6 +91,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithoutRangeFilter) .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name, col_str_define.name, col_i8_define.name}), @@ -161,11 +164,13 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithRangeFilter) RowKeyValue(false, std::make_shared(end_key_ss.releaseStr()), /*int_val_*/ read_nums_limit), false, store->getRowKeyColumnSize())}; + auto read_queue = std::make_shared(2, Logger::get()); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, key_ranges, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -176,6 +181,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithRangeFilter) .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name, col_str_define.name, col_i8_define.name}), @@ -241,12 +247,14 @@ try } { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -257,6 +265,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -376,12 +385,14 @@ try } { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -392,6 +403,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -490,12 +502,14 @@ try store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -506,6 +520,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -607,12 +622,14 @@ try store->mergeDeltaAll(*db_context); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -623,6 +640,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -692,12 +710,14 @@ try store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -708,6 +728,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { @@ -864,12 +885,14 @@ try // Read after deletion { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -880,6 +903,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // filter del mark = 1, thus just read the insert data before delete ASSERT_INPUTSTREAM_COLS_UR( in, @@ -894,12 +918,14 @@ try store->mergeDeltaAll(*db_context); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -910,6 +936,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -943,12 +970,14 @@ try } // Test Reading first { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -959,6 +988,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -972,12 +1002,14 @@ try } // Read after deletion { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -988,6 +1020,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // filter del mark = 1, thus just read the insert data before delete ASSERT_INPUTSTREAM_COLS_UR( in, @@ -1035,12 +1068,14 @@ try // Read after merge delta { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1051,6 +1086,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); auto pk_coldata = createNumbers(num_deleted_rows, num_rows_write); ASSERT_EQ(pk_coldata.size(), num_rows_write - num_deleted_rows); ASSERT_INPUTSTREAM_COLS_UR(in, Strings({DMTestEnv::pk_name}), createColumns({createColumn(pk_coldata)})); @@ -1123,12 +1159,14 @@ try // Read in fast mode { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1139,6 +1177,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { @@ -1218,12 +1257,14 @@ try // Read with version in normal case { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ static_cast(1), EMPTY_FILTER, @@ -1235,6 +1276,7 @@ try .is_fast_scan = false, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // Data is not guaranteed to be returned in order. ASSERT_UNORDERED_INPUTSTREAM_COLS_UR( in, @@ -1276,12 +1318,14 @@ try // could do clean read with no optimization { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1292,6 +1336,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -1325,11 +1370,13 @@ try } } + auto read_queue = std::make_shared(2, Logger::get()); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), real_columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1340,6 +1387,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write - num_deleted_rows); } } @@ -1406,12 +1454,14 @@ try store->mergeDeltaAll(*db_context); auto fastscan_rows = [&]() { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1422,6 +1472,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); size_t rows = 0; in->readPrefix(); while (true) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp index 6fbb41477ff..dd306c438f9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp @@ -246,11 +246,13 @@ try for (int i = 0; i < upper_bound; ++i) if (filled_bitmap[i]) expected_pk_column.emplace_back(i); + auto read_queue = std::make_shared(2, Logger::get("read")); auto stream = store->read( *db_context, db_context->getSettingsRef(), store->getTableColumns(), {RowKeyRange::newAll(is_common_handle, 1)}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -260,6 +262,7 @@ try DMReadOptions{ .keep_order = true, })[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name}), diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index 33c396029ba..198fe411ab4 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -189,11 +189,13 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); + auto read_queue = std::make_shared(stream_count, Logger::get()); auto streams = store->read( *context, context->getSettingsRef(), columns, ranges, + read_queue, stream_count, read_ts, filter, @@ -205,6 +207,8 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) .is_fast_scan = opts->is_fast_scan, }, excepted_block_size); + + read_queue->finishQueueIfEmpty(); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index d33d0fbaff5..25da4417fcf 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -340,12 +340,14 @@ CATCH void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRange & range, size_t rows) { + auto read_queue = std::make_shared(2, Logger::get("verifyRows")); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( ctx, ctx.getSettingsRef(), columns, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), DM::EMPTY_FILTER, @@ -354,6 +356,7 @@ void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRan "KVStoreFastAddPeer", DM::DMReadOptions{}, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, rows); } diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 196bd6ebdcf..a2c309b4909 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -32,6 +32,7 @@ SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) , keep_order(rhs.keep_order) , is_fast_scan(rhs.is_fast_scan) , has_multiple_partitions(rhs.has_multiple_partitions) + , read_queue(rhs.read_queue) {} SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept @@ -43,6 +44,7 @@ SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept , keep_order(rhs.keep_order) , is_fast_scan(rhs.is_fast_scan) , has_multiple_partitions(rhs.has_multiple_partitions) + , read_queue(std::move(rhs.read_queue)) {} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index b869389f803..6d7f22f796f 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -35,6 +35,11 @@ using PreparedSets = std::unordered_map; struct MvccQueryInfo; struct DAGQueryInfo; +namespace DM +{ +class ActiveSegmentReadTaskQueue; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; +} // namespace DM /** Query along with some additional data, * that can be used during query processing @@ -57,6 +62,8 @@ struct SelectQueryInfo bool is_fast_scan = false; bool has_multiple_partitions = false; + DM::ActiveSegmentReadTaskQueuePtr read_queue; + SelectQueryInfo(); ~SelectQueryInfo(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9f555eaf771..55e14927e41 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -847,6 +847,7 @@ BlockInputStreams StorageDeltaMerge::read( context, context.getSettingsRef(), columns_to_read, + query_info.read_queue, num_streams, query_info.keep_order, parseSegmentSet(select_query.segment_expression_list), @@ -889,6 +890,7 @@ BlockInputStreams StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, ranges, + query_info.read_queue, num_streams, /*start_ts=*/mvcc_query_info.start_ts, pushdown_executor, @@ -940,6 +942,7 @@ void StorageDeltaMerge::read( context, context.getSettingsRef(), columns_to_read, + query_info.read_queue, num_streams, query_info.keep_order, parseSegmentSet(select_query.segment_expression_list), @@ -985,6 +988,7 @@ void StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, ranges, + query_info.read_queue, num_streams, /*start_ts=*/mvcc_query_info.start_ts, pushdown_executor, @@ -1143,7 +1147,7 @@ UInt64 StorageDeltaMerge::onSyncGc(Int64 limit, const GCOptions & gc_options) // just for testing size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { - size_t rows = 0; + auto read_queue = std::make_shared(2, Logger::get("getRows")); ColumnDefines to_read{getExtraHandleColumnDefine(store->isCommonHandle())}; auto stream = store->read( @@ -1151,6 +1155,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM context.getSettingsRef(), to_read, {range}, + read_queue, 1, std::numeric_limits::max(), EMPTY_FILTER, @@ -1158,8 +1163,11 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM 0, /*tracing_id*/ "getRows", DMReadOptions{})[0]; + + read_queue->finishQueueIfEmpty(); stream->readPrefix(); Block block; + size_t rows = 0; while ((block = stream->read())) rows += block.rows(); stream->readSuffix(); @@ -1170,6 +1178,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM // just for testing DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { + auto read_queue = std::make_shared(2, Logger::get("getRange")); + auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()); { @@ -1179,6 +1189,7 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context context.getSettingsRef(), to_read, {DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, 1, std::numeric_limits::max(), EMPTY_FILTER, @@ -1186,6 +1197,8 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context 0, /*tracing_id*/ "getRange", DMReadOptions{})[0]; + + read_queue->finishQueueIfEmpty(); stream->readPrefix(); Block block; size_t index = 0; diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 33c81d94db8..b22e018cbc4 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -22,10 +22,11 @@ #include #include #include +#include #include #include #include -#include +#include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -111,6 +112,7 @@ class StorageDisaggregated : public IStorage const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, + DM::ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, int extra_table_id_index); void buildRemoteSegmentInputStreams( diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index e7101dc6036..b3499ae1643 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -568,6 +569,7 @@ std::variant StorageDisagg const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, + DM::ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, int extra_table_id_index) { @@ -618,7 +620,11 @@ std::variant StorageDisagg if (enable_read_thread) { + // Now for StorageDisaggregated, we create only one SegmentReadTaskPool for segments from all partitions. + // Use the logical table id as a workaround. + TableID table_id = table_scan.getLogicalTableID(); return std::make_shared( + read_queue, extra_table_id_index, *column_defines, push_down_executor, @@ -631,6 +637,7 @@ std::variant StorageDisagg /*enable_read_thread*/ true, num_streams, context.getDAGContext()->getKeyspaceID(), + table_id, context.getDAGContext()->getResourceGroupName()); } else @@ -685,10 +692,18 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( size_t num_streams, DAGPipeline & pipeline) { + // Share the read queue among all inputstreams. Note that for StorageDisaggregated, + // now we create only one SegmentReadTaskPool for segment from all partitions. + auto read_queue = std::make_shared(num_streams, log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks - = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); + auto packed_read_tasks = packSegmentReadTasks( + db_context, + std::move(read_tasks), + column_defines, + read_queue, + num_streams, + extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); pipeline.streams.reserve(num_streams); @@ -712,7 +727,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( }); } -struct SrouceOpBuilder +struct SourceOpBuilder { const String & tracing_id; const DM::ColumnDefinesPtr & column_defines; @@ -751,13 +766,22 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( DM::SegmentReadTasks && read_tasks, size_t num_streams) { + // Share the read queue among all source ops. Note that for StorageDisaggregated, + // now we create only one SegmentReadTaskPool for segment from all partitions. + auto read_queue = std::make_shared(num_streams, log); + exec_context.addStorageTaskQueue(read_queue); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks - = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); + auto packed_read_tasks = packSegmentReadTasks( + db_context, + std::move(read_tasks), + column_defines, + read_queue, + num_streams, + extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); - SrouceOpBuilder builder{ + SourceOpBuilder builder{ .tracing_id = log->identifier(), .column_defines = column_defines, .extra_table_id_index = extra_table_id_index,