From c258b621c939ec47728882cf14f7c84082283900 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Sun, 19 Oct 2025 16:12:25 +0800 Subject: [PATCH 1/4] Share the block_queue for partition table scan Signed-off-by: JaySon-Huang --- .../Coprocessor/DAGStorageInterpreter.cpp | 108 +++++++--- .../Flash/Coprocessor/DAGStorageInterpreter.h | 6 +- dbms/src/Interpreters/Settings.h | 1 + dbms/src/Operators/ConcatSourceOp.h | 185 +++++++++++++++- dbms/src/Operators/UnorderedSourceOp.cpp | 14 ++ dbms/src/Operators/UnorderedSourceOp.h | 14 +- .../Operators/tests/gtest_concat_source.cpp | 153 ++++++++++++++ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 12 ++ .../src/Storages/DeltaMerge/DeltaMergeStore.h | 5 + .../Storages/DeltaMerge/File/DMFileReader.cpp | 4 + .../DeltaMerge/ReadThread/SharedBlockQueue.h | 197 ++++++++++++++++++ .../tests/gtest_segment_read_task_pool.cpp | 3 + .../DeltaMerge/SegmentReadTaskPool.cpp | 64 ++---- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 75 ++----- .../DeltaMerge/SegmentReadTaskPool_fwd.h | 28 +++ .../DeltaMerge/workload/DTWorkload.cpp | 2 + dbms/src/Storages/SelectQueryInfo.cpp | 2 + dbms/src/Storages/SelectQueryInfo.h | 7 + dbms/src/Storages/StorageDeltaMerge.cpp | 11 +- dbms/src/Storages/StorageDisaggregated.h | 4 +- .../Storages/StorageDisaggregatedRemote.cpp | 27 ++- 21 files changed, 772 insertions(+), 150 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h create mode 100644 dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index da3b0245256..5e32f502174 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -917,6 +918,8 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() std::unordered_map DAGStorageInterpreter::generateSelectQueryInfos() { std::unordered_map ret; + bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat; + auto shared_read_queue = std::make_shared(log); auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo { SelectQueryInfo query_info; /// to avoid null point exception @@ -934,6 +937,14 @@ std::unordered_map DAGStorageInterpreter::generateSele query_info.req_id = fmt::format("{} table_id={}", log->identifier(), table_id); query_info.keep_order = table_scan.keepOrder(); query_info.is_fast_scan = table_scan.isFastScan(); + if (use_unordered_concat) + { + query_info.read_queue = shared_read_queue; + } + else + { + query_info.read_queue = std::make_shared(Logger::get(query_info.req_id)); + } return query_info; }; RUNTIME_CHECK_MSG(mvcc_query_info->scan_context != nullptr, "Unexpected null scan_context"); @@ -1062,16 +1073,16 @@ Int32 getMaxAllowRetryForLocalRead(const SelectQueryInfo & query_info) } } // namespace -DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( +void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( const TableID & table_id, const SelectQueryInfo & query_info, DAGPipeline & pipeline, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size) { - DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) - return table_snap; + return; assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); auto & storage = storages_with_structure_lock[table_id].storage; @@ -1089,6 +1100,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { try { + DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; if (!dag_context.is_disaggregated_task) { // build local inputstreams @@ -1116,6 +1128,12 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` // may contain different data other than expected. validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + + // Only after all streams are built successfully, we add the task to mvcc_query_info + if (table_snap) + { + disagg_snap->addTask(table_id, std::move(table_snap)); + } break; } catch (RegionException & e) @@ -1126,6 +1144,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { // clean all streams from local because we are not sure the correctness of those streams pipeline.streams.clear(); + // clean table task from read_queue + query_info.read_queue->resetTableTask(table_id); if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry))) continue; // next retry to read from local storage else @@ -1145,20 +1165,19 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal throw; } } - return table_snap; } -DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalExecForPhysicalTable( +void DAGStorageInterpreter::buildLocalExecForPhysicalTable( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, const TableID & table_id, const SelectQueryInfo & query_info, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size) { - DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) - return table_snap; + return; RUNTIME_CHECK(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); auto & storage = storages_with_structure_lock[table_id].storage; @@ -1169,6 +1188,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { try { + DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap; if (!dag_context.is_disaggregated_task) { storage->read( @@ -1199,6 +1219,13 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `sourceOps` // may contain different data other than expected. validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + + // Only after all sourceOps are built and verified, we add the snapshot to group_builder + if (table_snap != nullptr) + { + RUNTIME_CHECK(disagg_snap != nullptr); + disagg_snap->addTask(table_id, std::move(table_snap)); + } break; } catch (RegionException & e) @@ -1209,6 +1236,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal { // clean all operator from local because we are not sure the correctness of those operators group_builder.reset(); + // clean table task from read_queue + query_info.read_queue->resetTableTask(table_id); if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry))) continue; else @@ -1228,7 +1257,6 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal throw; } } - return table_snap; } void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size) @@ -1239,6 +1267,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max return; mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); + RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read"); bool has_multiple_partitions = table_query_infos.size() > 1; // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case std::shared_ptr stream_pool @@ -1250,12 +1279,17 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max DAGPipeline current_pipeline; const TableID physical_table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - auto table_snap - = buildLocalStreamsForPhysicalTable(physical_table_id, query_info, current_pipeline, max_block_size); - if (table_snap) - { - disaggregated_snap->addTask(physical_table_id, std::move(table_snap)); - } + RUNTIME_CHECK_MSG( + query_info.read_queue != nullptr, + "read_queue should not be null, table_id={}", + physical_table_id); + buildLocalStreamsForPhysicalTable( + physical_table_id, + query_info, + current_pipeline, + disaggregated_snap, + max_block_size); + if (has_multiple_partitions) stream_pool->addPartitionStreams(current_pipeline.streams); else @@ -1265,6 +1299,10 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max current_pipeline.streams.end()); } + assert(!table_query_infos.empty()); // check at start of this function + auto read_queue = table_query_infos.begin()->second.read_queue; + read_queue->finishQueueIfEmpty(); + LOG_DEBUG( log, "local streams built, is_disaggregated_task={} snap_id={}", @@ -1306,21 +1344,29 @@ void DAGStorageInterpreter::buildLocalExec( return; mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); - bool has_multiple_partitions = table_query_infos.size() > 1; - ConcatBuilderPool builder_pool{max_streams}; + RUNTIME_CHECK_MSG(!table_query_infos.empty(), "No table query info generated for local read"); + const bool has_multiple_partitions = table_query_infos.size() > 1; + + const bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat; + ConcatBuilderPool builder_pool{max_streams, context.getSettingsRef().dt_enable_unordered_concat}; auto disaggregated_snap = std::make_shared(); for (const auto & table_query_info : table_query_infos) { - PipelineExecGroupBuilder builder; const TableID physical_table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - auto table_snap - = buildLocalExecForPhysicalTable(exec_context, builder, physical_table_id, query_info, max_block_size); - if (table_snap) - { - disaggregated_snap->addTask(physical_table_id, std::move(table_snap)); - } + RUNTIME_CHECK_MSG( + query_info.read_queue != nullptr, + "read_queue should not be null, table_id={}", + physical_table_id); + PipelineExecGroupBuilder builder; + buildLocalExecForPhysicalTable( + exec_context, + builder, + physical_table_id, + query_info, + disaggregated_snap, + max_block_size); if (has_multiple_partitions) builder_pool.add(builder); @@ -1328,8 +1374,22 @@ void DAGStorageInterpreter::buildLocalExec( group_builder.merge(std::move(builder)); } - LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task); + if (use_unordered_concat) + { + assert(!table_query_infos.empty()); // check at start of this function + auto read_queue = table_query_infos.begin()->second.read_queue; + read_queue->finishQueueIfEmpty(); + } + else + { + for (const auto & table_query_info : table_query_infos) + { + const SelectQueryInfo & query_info = table_query_info.second; + query_info.read_queue->finishQueueIfEmpty(); + } + } + LOG_DEBUG(log, "local sourceOps built, is_disaggregated_task={}", dag_context.is_disaggregated_task); if (dag_context.is_disaggregated_task) { // register the snapshot to manager diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 5d88f2d83e6..46783ed5d61 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -73,17 +73,19 @@ class DAGStorageInterpreter const RegionException & e, int num_allow_retry); - DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalStreamsForPhysicalTable( + void buildLocalStreamsForPhysicalTable( const TableID & table_id, const SelectQueryInfo & query_info, DAGPipeline & pipeline, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size); - DM::Remote::DisaggPhysicalTableReadSnapshotPtr buildLocalExecForPhysicalTable( + void buildLocalExecForPhysicalTable( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, const TableID & table_id, const SelectQueryInfo & query_info, + DM::Remote::DisaggReadSnapshotPtr & disagg_snap, size_t max_block_size); void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c8408cc85c2..1f7e8683f67 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -184,6 +184,7 @@ struct Settings M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \ M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \ M(SettingDouble, dt_page_gc_threshold_raft_data, 0.05, "Max valid rate of deciding to do a GC for BlobFile storing PageData in PageStorage") \ + M(SettingBool, dt_enable_unordered_concat, false, "") \ M(SettingInt64, enable_version_chain, 1, "Enable version chain or not: 0 - disable, 1 - enabled. " \ "More details are in the comments of `enum class VersionChainMode`." \ "Modifying this configuration requires a restart to reset the in-memory state.") \ diff --git a/dbms/src/Operators/ConcatSourceOp.h b/dbms/src/Operators/ConcatSourceOp.h index a816abda0c6..6b36c326d0e 100644 --- a/dbms/src/Operators/ConcatSourceOp.h +++ b/dbms/src/Operators/ConcatSourceOp.h @@ -178,10 +178,173 @@ class ConcatSourceOp : public SourceOp bool done = false; }; +class UnorderedConcatSourceOp : public SourceOp +{ +private: + struct PipelineExecWithStatus + { + PipelineExecPtr exec; + bool is_finished = false; + }; + +public: + UnorderedConcatSourceOp( + PipelineExecutorContext & exec_context_, + const String & req_id, + std::vector & exec_builder_pool) + : SourceOp(exec_context_, req_id) + , cur_exec_index(0) + { + RUNTIME_CHECK(!exec_builder_pool.empty()); + setHeader(exec_builder_pool.back().getCurrentHeader()); + for (auto & exec_builder : exec_builder_pool) + { + exec_builder.setSinkOp(std::make_unique(exec_context_, req_id, res)); + exec_pool.push_back(PipelineExecWithStatus{.exec = exec_builder.build(false), .is_finished = false}); + } + } + + String getName() const override { return "UnorderedConcatSourceOp"; } + + // UnorderedConcatSourceOp is used to merge multiple partitioned tables of storage layer, so override `getIOProfileInfo` is needed here. + IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); } + +protected: + void operatePrefixImpl() override + { + if (exec_pool.empty()) + { + done = true; + return; + } + + for (auto & exec_with_status : exec_pool) + { + exec_with_status.exec->executePrefix(); + } + } + + void operateSuffixImpl() override + { + for (auto & exec_with_status : exec_pool) + { + if (!exec_with_status.is_finished) + { + exec_with_status.exec->executeSuffix(); + exec_with_status.exec.reset(); + exec_with_status.is_finished = true; + } + } + exec_pool.clear(); + } + + OperatorStatus readImpl(Block & block) override + { + if unlikely (done) + return OperatorStatus::HAS_OUTPUT; + + if unlikely (res) + { + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + } + + std::optional beg_idx = std::nullopt; + while (true) + { + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->execute(); + switch (status) + { + case OperatorStatus::WAIT_FOR_NOTIFY: + if (beg_idx.has_value() && beg_idx.value() == cur_exec_index) + { + // all exec are waiting + return OperatorStatus::WAIT_FOR_NOTIFY; + } + if (beg_idx == std::nullopt) + beg_idx = cur_exec_index; + // try to read from next exec + tryNextExec(); + break; + case OperatorStatus::NEED_INPUT: + assert(res); + std::swap(block, res); + return OperatorStatus::HAS_OUTPUT; + case OperatorStatus::FINISHED: + exec_pool[cur_exec_index].exec->executeSuffix(); + exec_pool[cur_exec_index].exec.reset(); + exec_pool[cur_exec_index].is_finished = true; + if (!tryNextExec()) + { + done = true; + return OperatorStatus::HAS_OUTPUT; + } + break; + default: + return status; + } + } + } + + bool tryNextExec() + { + size_t beg_idx = cur_exec_index; + while (true) + { + cur_exec_index = (cur_exec_index + 1) % exec_pool.size(); + if (!exec_pool[cur_exec_index].is_finished) + return true; + if (cur_exec_index == beg_idx) + { + // all exec finished + done = true; + return false; + } + } + } + + OperatorStatus executeIOImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->executeIO(); + assert(status != OperatorStatus::FINISHED); + return status; + } + + OperatorStatus awaitImpl() override + { + if unlikely (done || res) + return OperatorStatus::HAS_OUTPUT; + + assert(exec_pool[cur_exec_index].exec); + auto status = exec_pool[cur_exec_index].exec->await(); + assert(status != OperatorStatus::FINISHED); + return status; + } + + void notifyImpl() override + { + assert(exec_pool[cur_exec_index].exec); + exec_pool[cur_exec_index].exec->notify(); + } + +private: + std::vector exec_pool; + size_t cur_exec_index; + + Block res; + bool done = false; +}; + class ConcatBuilderPool { public: - explicit ConcatBuilderPool(size_t expect_size) + explicit ConcatBuilderPool(size_t expect_size, bool use_unordered_concat_ = false) + : use_unordered_concat(use_unordered_concat_) { RUNTIME_CHECK(expect_size > 0); pool.resize(expect_size); @@ -204,6 +367,15 @@ class ConcatBuilderPool const String & req_id) { RUNTIME_CHECK(result_builder.empty()); + if (use_unordered_concat) + { + LOG_INFO(Logger::get(req_id), "Using UnorderedConcatSourceOp to concat source ops"); + } + else + { + LOG_INFO(Logger::get(req_id), "Using ConcatSourceOp to concat source ops"); + } + for (auto & builders : pool) { if (builders.empty()) @@ -216,7 +388,15 @@ class ConcatBuilderPool } else { - result_builder.addConcurrency(std::make_unique(exec_context, req_id, builders)); + if (use_unordered_concat) + { + result_builder.addConcurrency( + std::make_unique(exec_context, req_id, builders)); + } + else + { + result_builder.addConcurrency(std::make_unique(exec_context, req_id, builders)); + } } } } @@ -224,5 +404,6 @@ class ConcatBuilderPool private: std::vector> pool; size_t pre_index = 0; + bool use_unordered_concat; }; } // namespace DB diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index 5090da3dbbf..dad2d324517 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB { @@ -49,6 +50,18 @@ UnorderedSourceOp::UnorderedSourceOp( } } +UnorderedSourceOp::~UnorderedSourceOp() +{ + if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1) + { + LOG_INFO( + log, + "All unordered input streams are finished, pool_id={} last_stream_ref_no={}", + task_pool->pool_id, + ref_no); + } +} + OperatorStatus UnorderedSourceOp::readImpl(Block & block) { if unlikely (done) @@ -74,6 +87,7 @@ OperatorStatus UnorderedSourceOp::readImpl(Block & block) else { done = true; + // return HAS_OUTPUT with empty block to indicate end of stream return OperatorStatus::HAS_OUTPUT; } } diff --git a/dbms/src/Operators/UnorderedSourceOp.h b/dbms/src/Operators/UnorderedSourceOp.h index 385ff0bf8d6..70b479bf060 100644 --- a/dbms/src/Operators/UnorderedSourceOp.h +++ b/dbms/src/Operators/UnorderedSourceOp.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include namespace DB { @@ -37,17 +37,7 @@ class UnorderedSourceOp : public SourceOp int max_wait_time_ms_ = 0, bool is_disagg_ = false); - ~UnorderedSourceOp() override - { - if (const auto rc_before_decr = task_pool->decreaseUnorderedInputStreamRefCount(); rc_before_decr == 1) - { - LOG_INFO( - log, - "All unordered input streams are finished, pool_id={} last_stream_ref_no={}", - task_pool->pool_id, - ref_no); - } - } + ~UnorderedSourceOp() override; String getName() const override { return "UnorderedSourceOp"; } diff --git a/dbms/src/Operators/tests/gtest_concat_source.cpp b/dbms/src/Operators/tests/gtest_concat_source.cpp index dd741870dd6..230266e312f 100644 --- a/dbms/src/Operators/tests/gtest_concat_source.cpp +++ b/dbms/src/Operators/tests/gtest_concat_source.cpp @@ -179,6 +179,84 @@ class SimpleGetResultSinkOp : public SinkOp ResultHandler result_handler; }; +class MockReadTaskPool; +using MockReadTaskPoolPtr = std::shared_ptr; + +class MockReadTaskPool : public NotifyFuture +{ +public: + explicit MockReadTaskPool(size_t idx_, size_t slot_limit) + : idx(idx_) + , q(slot_limit) + {} + + void pushBlock(Block && block) { q.push(std::move(block), nullptr); } + void finish() { q.finish(); } + + bool tryPopBlock(Block & block) { return q.tryPop(block); } + + void registerTask(TaskPtr && task) override + { + q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); + } + + const size_t idx; + +private: + DB::DM::WorkQueue q; +}; + +class MockSourceOnReadTaskPoolOp : public SourceOp +{ +public: + MockSourceOnReadTaskPoolOp(PipelineExecutorContext & exec_context_, Block header_, const MockReadTaskPoolPtr & pool) + : SourceOp(exec_context_, "mock") + , done(false) + , task_pool(pool) + { + setHeader(header_); + } + + String getName() const override { return "MockSourceOp"; } + +protected: + OperatorStatus readImpl(Block & block) override + { + if (done) + return OperatorStatus::HAS_OUTPUT; + + while (true) + { + if (!task_pool->tryPopBlock(block)) + { + setNotifyFuture(task_pool.get()); + return OperatorStatus::WAIT_FOR_NOTIFY; + } + + if (block) + { + if (block.rows() == 0) + { + block.clear(); + continue; + } + LOG_INFO(Logger::get(), "A block is popped from pool_idx={}", task_pool->idx); + return OperatorStatus::HAS_OUTPUT; + } + else + { + done = true; + LOG_INFO(Logger::get(), "No more blocks in pool_idx={}", task_pool->idx); + // return HAS_OUTPUT with empty block to indicate end of stream + return OperatorStatus::HAS_OUTPUT; + } + } + } + +private: + bool done; + MockReadTaskPoolPtr task_pool; +}; } // namespace TEST_F(TestConcatSource, ConcatBuilderPoolWithDifferentConcurrency) @@ -255,4 +333,79 @@ try } CATCH +TEST_F(TestConcatSource, unorderedConcatSink) +try +{ + LoggerPtr log = Logger::get(); + + Blocks blks{ + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})}, + }; + + Block header = blks[0].cloneEmpty(); + + std::vector pools; + + PipelineExecutorContext exec_context; + size_t num_concurrency = 1; + ConcatBuilderPool builder_pool{num_concurrency}; + // Mock that for each partition (physical table), there is a read task pool and multiple source ops reading from it. + size_t num_partitions = 3; + for (size_t idx_part = 0; idx_part < num_partitions; ++idx_part) + { + PipelineExecGroupBuilder group_builder; + // MockReadTaskPoolPtr pool = std::make_shared(std::ceil(num_concurrency * 1.5)); + MockReadTaskPoolPtr pool = std::make_shared(idx_part, 0); + pools.push_back(pool); + for (size_t i = 0; i < num_concurrency; ++i) + { + group_builder.addConcurrency(std::make_unique(exec_context, header, pool)); + } + builder_pool.add(group_builder); + } + + auto thread_func = [](const MockReadTaskPoolPtr & pool, const Blocks & blks, const LoggerPtr & log) { + for (size_t i = 0; i < blks.size(); ++i) + { + Block blk = blks[i]; + pool->pushBlock(std::move(blk)); + LOG_INFO(log, "A block is pushed to pool, pool_idx={} n_pushed={}", pool->idx, i + 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + pool->finish(); + LOG_INFO(log, "All blocks are pushed to pool_idx={} n_pushed={}", pool->idx, blks.size()); + }; + + auto thread_mgr = DB::newThreadManager(); + for (const auto & pool : pools) + { + thread_mgr->schedule(false, "", [pool, &blks, &log, &thread_func]() { thread_func(pool, blks, log); }); + } + + std::atomic received_blocks = 0; + received_blocks.fetch_add(1, std::memory_order_relaxed); + + PipelineExecGroupBuilder result_builder; + builder_pool.generate(result_builder, exec_context, "test"); + result_builder.transform( + [&](auto & builder) { builder.setSinkOp(std::make_unique(exec_context, "test", h)); }); + auto op_pipeline_grp = result_builder.build(false); + op_pipeline_grp[0]->executePrefix(); + while (true) + { + auto s = op_pipeline_grp[0]->execute(); + if (s == OperatorStatus::FINISHED) + { + LOG_INFO(log, "ConcatPipelineExec is finished"); + break; + } + } + op_pipeline_grp[0]->executeSuffix(); + LOG_INFO(log, "ConcatPipelineExec is built and executed"); +} +CATCH } // namespace DB::tests diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a695f82b607..e5da912fd70 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1024,6 +1024,7 @@ BlockInputStreams DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const SharedBlockQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1073,6 +1074,7 @@ BlockInputStreams DeltaMergeStore::readRaw( if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) req_info = db_context.getDAGContext()->getMPPTaskId().toString(); auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, columns_to_read, EMPTY_FILTER, @@ -1085,6 +1087,7 @@ BlockInputStreams DeltaMergeStore::readRaw( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); BlockInputStreams res; @@ -1128,6 +1131,7 @@ void DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const SharedBlockQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1178,6 +1182,7 @@ void DeltaMergeStore::readRaw( if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) req_info = db_context.getDAGContext()->getMPPTaskId().toString(); auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, columns_to_read, EMPTY_FILTER, @@ -1190,6 +1195,7 @@ void DeltaMergeStore::readRaw( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); if (enable_read_thread) @@ -1270,6 +1276,7 @@ BlockInputStreams DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const SharedBlockQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -1310,6 +1317,7 @@ BlockInputStreams DeltaMergeStore::read( const auto & final_columns_to_read = executor && executor->extra_cast ? *executor->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, final_columns_to_read, executor, @@ -1322,6 +1330,7 @@ BlockInputStreams DeltaMergeStore::read( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); dm_context->scan_context->read_mode = read_mode; @@ -1381,6 +1390,7 @@ void DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const SharedBlockQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -1437,6 +1447,7 @@ void DeltaMergeStore::read( const auto & final_columns_to_read = executor && executor->extra_cast ? *executor->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( + read_queue, extra_table_id_index, final_columns_to_read, executor, @@ -1449,6 +1460,7 @@ void DeltaMergeStore::read( enable_read_thread, final_num_stream, dm_context->scan_context->keyspace_id, + physical_table_id, dm_context->scan_context->resource_group_name); dm_context->scan_context->read_mode = read_mode; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 89dba189c90..e0c8db897c3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -355,6 +356,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const SharedBlockQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -367,6 +369,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, + const SharedBlockQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -381,6 +384,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const SharedBlockQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -405,6 +409,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, + const SharedBlockQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 91f18c8836a..381f30f0b95 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -565,12 +565,15 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( size_t read_rows) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); + + // If reaching `max_sharing_column_bytes`, do not fill data_sharing_col_data_cache anymore. bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr && std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes); if (reach_sharing_column_memory_limit) { GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment(); } + if (has_concurrent_reader && !reach_sharing_column_memory_limit) { auto column = getColumnFromCache( @@ -599,6 +602,7 @@ ColumnPtr DMFileReader::readFromDiskOrSharingCache( return column; } + // Directly read from disk ignoring the data_sharing_col_data_cache. return readFromDisk(cd, type_on_disk, start_pack_id, read_rows); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h new file mode 100644 index 00000000000..6bd56113544 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h @@ -0,0 +1,197 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +// Statistics of blocks pushed. +// All methods are thread-safe. +class BlockStat +{ +public: + BlockStat() + : pending_count(0) + , pending_bytes(0) + , total_count(0) + , total_bytes(0) + {} + + void push(const Block & blk) + { + pending_count.fetch_add(1, std::memory_order_relaxed); + total_count.fetch_add(1, std::memory_order_relaxed); + + auto b = blk.bytes(); + pending_bytes.fetch_add(b, std::memory_order_relaxed); + total_bytes.fetch_add(b, std::memory_order_relaxed); + + total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); + } + + void pop(const Block & blk) + { + if (likely(blk)) + { + pending_count.fetch_sub(1, std::memory_order_relaxed); + pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); + } + } + + Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } + + Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } + + Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } + + Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } + + Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } + +private: + std::atomic pending_count; + std::atomic pending_bytes; + std::atomic total_count; + std::atomic total_bytes; + std::atomic total_rows; +}; + +class SharedBlockQueue +{ +public: + explicit SharedBlockQueue(LoggerPtr log_) + : log(std::move(log_)) + {} + + ~SharedBlockQueue() + { + auto [pop_times, pop_empty_times, max_queue_size] = q.getStat(); + auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; + auto total_count = blk_stat.totalCount(); + auto total_bytes = blk_stat.totalBytes(); + auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; + auto approx_max_pending_block_bytes = blk_avg_bytes * max_queue_size; + auto total_rows = blk_stat.totalRows(); + LOG_INFO( + log, + "SharedBlockQueue finished. pop={} pop_empty={} pop_empty_ratio={:.3f} " + "max_queue_size={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " + "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", + pop_times, + pop_empty_times, + pop_empty_ratio, + max_queue_size, + blk_avg_bytes, + approx_max_pending_block_bytes / 1024.0 / 1024.0, + total_count, + total_bytes / 1024.0 / 1024.0, + total_rows, + total_count > 0 ? total_rows / total_count : 0, + total_rows > 0 ? total_bytes / total_rows : 0); + } + + void addTableTask(TableID table_id) + { + std::unique_lock lock(mu); + tables.emplace(table_id); + } + + void resetTableTask(TableID table_id) + { + std::unique_lock lock(mu); + tables.erase(table_id); + } + void removeTableTask(TableID table_id) + { + String table_id_remain; + size_t remain_count = 0; + { + std::unique_lock lock(mu); + tables.erase(table_id); + remain_count = tables.size(); + table_id_remain = remain_count > 0 ? fmt::format("{}", tables) : "none"; + } + + if (remain_count == 0) + { + q.finish(); + } + + LOG_INFO( + log, + "removeTableTask table_id={} remain_count={} remain_table_ids={}", + table_id, + remain_count, + table_id_remain); + } + + void pushBlock(Block && block) + { + blk_stat.push(block); + q.push(std::move(block), nullptr); + } + void popBlock(Block & block) + { + q.pop(block); + blk_stat.pop(block); + } + bool tryPopBlock(Block & block) + { + if (q.tryPop(block)) + { + blk_stat.pop(block); + return true; + } + else + { + return false; + } + } + void registerTask(TaskPtr && task) { q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); } + + void finishQueueIfEmpty() + { + String table_id_remain; + { + std::unique_lock lock(mu); + if (tables.empty()) + { + q.finish(); + } + table_id_remain = tables.empty() ? "none" : fmt::format("{}", tables); + } + LOG_INFO(log, "finishQueueIfEmpty called, remain_table_ids={}", table_id_remain); + } + + void finishQueue() { q.finish(); } + + const BlockStat & getBlockStat() const { return blk_stat; } + +private: + WorkQueue q; + + // Statistics of blocks pushed into the queue. + BlockStat blk_stat; + + LoggerPtr log; + + std::mutex mu; + std::unordered_set tables; +}; +using SharedBlockQueuePtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp index 81329592f13..a1b51f5e8f5 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp @@ -92,8 +92,10 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic SegmentReadTaskPoolPtr createSegmentReadTaskPool(const std::vector & seg_ids) { + auto read_queue = std::make_shared(); auto dm_context = createDMContext(); return std::make_shared( + read_queue, /*extra_table_id_index_*/ dm_context->physical_table_id, /*columns_to_read_*/ ColumnDefines{}, /*filter_*/ nullptr, @@ -106,6 +108,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic /*enable_read_thread_*/ true, /*num_streams_*/ 1, /*keyspace_id_*/ NullspaceID, + dm_context->physical_table_id, /*res_group_name_*/ String{}); } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 481c4f647df..485c36fc4ef 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -111,6 +111,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t } SegmentReadTaskPool::SegmentReadTaskPool( + const SharedBlockQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -122,7 +123,8 @@ SegmentReadTaskPool::SegmentReadTaskPool( const String & tracing_id, bool enable_read_thread_, Int64 num_streams_, - const KeyspaceID & keyspace_id_, + KeyspaceID keyspace_id_, + TableID table_id_, const String & res_group_name_) : pool_id(nextPoolId()) , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) @@ -135,7 +137,7 @@ SegmentReadTaskPool::SegmentReadTaskPool( , total_read_tasks(tasks_.size()) , tasks_wrapper(enable_read_thread_, std::move(tasks_)) , after_segment_read(after_segment_read_) - , peak_active_segments(0) + , shared_q(shared_q_) , log(Logger::get(tracing_id)) , unordered_input_stream_ref_count(0) , exception_happened(false) @@ -148,44 +150,23 @@ SegmentReadTaskPool::SegmentReadTaskPool( // situations where the computation may be faster and the storage layer may not be able to keep up. , active_segment_limit(std::max(num_streams_, 2)) , keyspace_id(keyspace_id_) + , table_id(table_id_) , res_group_name(res_group_name_) { - if (tasks_wrapper.empty()) + RUNTIME_CHECK_MSG( + shared_q != nullptr, + "SegmentReadTaskPool requires a valid SharedBlockQueuePtr, tracing_id={}", + tracing_id); + + if (!tasks_wrapper.empty()) { - q.finish(); + shared_q->addTableTask(table_id); } } SegmentReadTaskPool::~SegmentReadTaskPool() { - auto [pop_times, pop_empty_times, peak_blocks_in_queue] = q.getStat(); - auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; - auto total_count = blk_stat.totalCount(); - auto total_bytes = blk_stat.totalBytes(); - auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; - auto approx_max_pending_block_bytes = blk_avg_bytes * peak_blocks_in_queue; - auto total_rows = blk_stat.totalRows(); - LOG_INFO( - log, - "Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={:.3f} " - "active_segment_limit={} peak_active_segments={} " - "block_slot_limit={} peak_blocks_in_queue={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " - "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", - pool_id, - pop_times, - pop_empty_times, - pop_empty_ratio, - active_segment_limit, - peak_active_segments, - block_slot_limit, - peak_blocks_in_queue, - blk_avg_bytes, - approx_max_pending_block_bytes / 1024.0 / 1024.0, - total_count, - total_bytes / 1024.0 / 1024.0, - total_rows, - total_count > 0 ? total_rows / total_count : 0, - total_rows > 0 ? total_bytes / total_rows : 0); + LOG_DEBUG(log, "SegmentReadTaskPool finished. table_id={} pool_id={}", table_id, pool_id); } void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg) @@ -200,8 +181,9 @@ void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg) LOG_DEBUG(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished); if (pool_finished) { - q.finish(); - LOG_INFO(log, "pool_id={} finished", pool_id); + // notify the queue this table_id is finished only when all segments are finished. + shared_q->removeTableTask(table_id); + LOG_INFO(log, "SegmentReadTaskPool all segment are finished, table_id={} pool_id={}", table_id, pool_id); } } @@ -217,7 +199,6 @@ SegmentReadTaskPtr SegmentReadTaskPool::getTask(const GlobalSegmentID & seg_id) auto t = tasks_wrapper.getTask(seg_id); RUNTIME_CHECK(t != nullptr, pool_id, seg_id); active_segment_ids.insert(seg_id); - peak_active_segments = std::max(peak_active_segments, active_segment_ids.size()); return t; } @@ -309,8 +290,7 @@ bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const Segme void SegmentReadTaskPool::popBlock(Block & block) { - q.pop(block); - blk_stat.pop(block); + shared_q->popBlock(block); global_blk_stat.pop(block); if (exceptionHappened()) { @@ -320,9 +300,8 @@ void SegmentReadTaskPool::popBlock(Block & block) bool SegmentReadTaskPool::tryPopBlock(Block & block) { - if (q.tryPop(block)) + if (shared_q->tryPopBlock(block)) { - blk_stat.pop(block); global_blk_stat.pop(block); if (exceptionHappened()) throw exception; @@ -336,12 +315,11 @@ bool SegmentReadTaskPool::tryPopBlock(Block & block) void SegmentReadTaskPool::pushBlock(Block && block) { - blk_stat.push(block); global_blk_stat.push(block); auto bytes = block.bytes(); read_bytes_after_last_check += bytes; GET_METRIC(tiflash_storage_read_thread_counter, type_push_block_bytes).Increment(bytes); - q.push(std::move(block), nullptr); + shared_q->pushBlock(std::move(block)); } Int64 SegmentReadTaskPool::increaseUnorderedInputStreamRefCount() @@ -355,7 +333,7 @@ Int64 SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount() Int64 SegmentReadTaskPool::getFreeBlockSlots() const { - return block_slot_limit - blk_stat.pendingCount(); + return block_slot_limit - shared_q->getBlockStat().pendingCount(); } Int64 SegmentReadTaskPool::getFreeActiveSegments() const @@ -391,7 +369,7 @@ void SegmentReadTaskPool::setException(const DB::Exception & e) { exception = e; exception_happened.store(true, std::memory_order_relaxed); - q.finish(); + shared_q->finishQueue(); } } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 36e8d5cab20..593c9172044 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -21,8 +21,10 @@ #include #include #include +#include #include #include +#include namespace DB::DM { @@ -30,56 +32,6 @@ namespace tests { class SegmentReadTasksPoolTest; } -using AfterSegmentRead = std::function; - -class BlockStat -{ -public: - BlockStat() - : pending_count(0) - , pending_bytes(0) - , total_count(0) - , total_bytes(0) - {} - - void push(const Block & blk) - { - pending_count.fetch_add(1, std::memory_order_relaxed); - total_count.fetch_add(1, std::memory_order_relaxed); - - auto b = blk.bytes(); - pending_bytes.fetch_add(b, std::memory_order_relaxed); - total_bytes.fetch_add(b, std::memory_order_relaxed); - - total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); - } - - void pop(const Block & blk) - { - if (likely(blk)) - { - pending_count.fetch_sub(1, std::memory_order_relaxed); - pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); - } - } - - Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } - - Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } - - Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } - - Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } - - Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } - -private: - std::atomic pending_count; - std::atomic pending_bytes; - std::atomic total_count; - std::atomic total_bytes; - std::atomic total_rows; -}; // If `enable_read_thread_` is true, `SegmentReadTasksWrapper` use `std::unordered_map` to index `SegmentReadTask` by segment id, // else it is the same as `SegmentReadTasks`, a `std::list` of `SegmentReadTask`. @@ -104,12 +56,14 @@ class SegmentReadTasksWrapper std::unordered_map unordered_tasks; }; +// The SegmentReadTaskPool manages the read tasks for a query on a physical table. class SegmentReadTaskPool : public NotifyFuture , private boost::noncopyable { public: SegmentReadTaskPool( + const SharedBlockQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -121,7 +75,8 @@ class SegmentReadTaskPool const String & tracing_id, bool enable_read_thread_, Int64 num_streams_, - const KeyspaceID & keyspace_id_, + KeyspaceID keyspace_id_, + TableID table_id_, const String & res_group_name_); ~SegmentReadTaskPool() override; @@ -151,10 +106,7 @@ class SegmentReadTaskPool std::once_flag & addToSchedulerFlag() { return add_to_scheduler; } - void registerTask(TaskPtr && task) override - { - q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); - } + void registerTask(TaskPtr && task) override { shared_q->registerTask(std::move(task)); } std::once_flag & getRemoteConnectionInfoFlag() { return get_remote_connection_flag; } std::optional> getRemoteConnectionInfo() const; @@ -213,10 +165,10 @@ class SegmentReadTaskPool SegmentReadTasksWrapper tasks_wrapper; AfterSegmentRead after_segment_read; mutable std::mutex mutex; - size_t peak_active_segments; std::unordered_set active_segment_ids; - WorkQueue q; - BlockStat blk_stat; + // WorkQueue q; + // BlockStat blk_stat; + SharedBlockQueuePtr shared_q; LoggerPtr log; std::atomic unordered_input_stream_ref_count; @@ -224,9 +176,9 @@ class SegmentReadTaskPool std::atomic exception_happened; DB::Exception exception; - // SegmentReadTaskPool will be held by several UnorderedBlockInputStreams. - // It will be added to SegmentReadTaskScheduler when one of the UnorderedBlockInputStreams being read. - // Since several UnorderedBlockInputStreams can be read by several threads concurrently, we use + // SegmentReadTaskPool will be held by several `UnorderedInputStream`s/`UnorderedSourceOp`s. + // It will be added to SegmentReadTaskScheduler when one of the UnorderedInputStream/UnorderedSourceOp being read. + // Since several UnorderedInputStream/UnorderedSourceOp can be read by several threads concurrently, we use // std::once_flag and std::call_once to prevent duplicated add. std::once_flag add_to_scheduler; @@ -234,6 +186,7 @@ class SegmentReadTaskPool const Int64 active_segment_limit; const KeyspaceID keyspace_id; + const TableID table_id; const String res_group_name; std::mutex ru_mu; std::atomic last_time_check_ru = 0; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h new file mode 100644 index 00000000000..13c4b3d994f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool_fwd.h @@ -0,0 +1,28 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB::DM +{ +class SegmentReadTaskPool; +using SegmentReadTaskPoolPtr = std::shared_ptr; + +using AfterSegmentRead = std::function; +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index 33c396029ba..140ee6a3dd2 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -189,11 +189,13 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); + SharedBlockQueuePtr read_queue = std::make_shared(Logger::get()); auto streams = store->read( *context, context->getSettingsRef(), columns, ranges, + read_queue, stream_count, read_ts, filter, diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 196bd6ebdcf..a2c309b4909 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -32,6 +32,7 @@ SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) , keep_order(rhs.keep_order) , is_fast_scan(rhs.is_fast_scan) , has_multiple_partitions(rhs.has_multiple_partitions) + , read_queue(rhs.read_queue) {} SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept @@ -43,6 +44,7 @@ SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept , keep_order(rhs.keep_order) , is_fast_scan(rhs.is_fast_scan) , has_multiple_partitions(rhs.has_multiple_partitions) + , read_queue(std::move(rhs.read_queue)) {} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index b869389f803..7e1ac3b1614 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -35,6 +35,11 @@ using PreparedSets = std::unordered_map; struct MvccQueryInfo; struct DAGQueryInfo; +namespace DM +{ +class SharedBlockQueue; +using SharedBlockQueuePtr = std::shared_ptr; +} // namespace DM /** Query along with some additional data, * that can be used during query processing @@ -57,6 +62,8 @@ struct SelectQueryInfo bool is_fast_scan = false; bool has_multiple_partitions = false; + DM::SharedBlockQueuePtr read_queue; + SelectQueryInfo(); ~SelectQueryInfo(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9f555eaf771..03614e7c72d 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -847,6 +847,7 @@ BlockInputStreams StorageDeltaMerge::read( context, context.getSettingsRef(), columns_to_read, + query_info.read_queue, num_streams, query_info.keep_order, parseSegmentSet(select_query.segment_expression_list), @@ -889,6 +890,7 @@ BlockInputStreams StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, ranges, + query_info.read_queue, num_streams, /*start_ts=*/mvcc_query_info.start_ts, pushdown_executor, @@ -940,6 +942,7 @@ void StorageDeltaMerge::read( context, context.getSettingsRef(), columns_to_read, + query_info.read_queue, num_streams, query_info.keep_order, parseSegmentSet(select_query.segment_expression_list), @@ -985,6 +988,7 @@ void StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, ranges, + query_info.read_queue, num_streams, /*start_ts=*/mvcc_query_info.start_ts, pushdown_executor, @@ -1143,7 +1147,7 @@ UInt64 StorageDeltaMerge::onSyncGc(Int64 limit, const GCOptions & gc_options) // just for testing size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { - size_t rows = 0; + SharedBlockQueuePtr read_queue = std::make_shared(Logger::get("getRows")); ColumnDefines to_read{getExtraHandleColumnDefine(store->isCommonHandle())}; auto stream = store->read( @@ -1151,6 +1155,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM context.getSettingsRef(), to_read, {range}, + read_queue, 1, std::numeric_limits::max(), EMPTY_FILTER, @@ -1160,6 +1165,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM DMReadOptions{})[0]; stream->readPrefix(); Block block; + size_t rows = 0; while ((block = stream->read())) rows += block.rows(); stream->readSuffix(); @@ -1170,6 +1176,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM // just for testing DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { + SharedBlockQueuePtr read_queue = std::make_shared(Logger::get("getRange")); + auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()); { @@ -1179,6 +1187,7 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context context.getSettingsRef(), to_read, {DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, 1, std::numeric_limits::max(), EMPTY_FILTER, diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 33c81d94db8..5ed5d02672b 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -22,10 +22,11 @@ #include #include #include +#include #include #include #include -#include +#include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -111,6 +112,7 @@ class StorageDisaggregated : public IStorage const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, + DM::SharedBlockQueuePtr & read_queue, size_t num_streams, int extra_table_id_index); void buildRemoteSegmentInputStreams( diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index e7101dc6036..34d07a2cf56 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -568,6 +569,7 @@ std::variant StorageDisagg const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, + DM::SharedBlockQueuePtr & read_queue, size_t num_streams, int extra_table_id_index) { @@ -618,7 +620,9 @@ std::variant StorageDisagg if (enable_read_thread) { + TableID table_id = table_scan.getLogicalTableID(); // FIXME: return std::make_shared( + read_queue, extra_table_id_index, *column_defines, push_down_executor, @@ -631,6 +635,7 @@ std::variant StorageDisagg /*enable_read_thread*/ true, num_streams, context.getDAGContext()->getKeyspaceID(), + table_id, context.getDAGContext()->getResourceGroupName()); } else @@ -685,10 +690,17 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( size_t num_streams, DAGPipeline & pipeline) { + // Share the read queue among all inputstreams // TODO: share for partition tables under disagg + auto read_queue = std::make_shared(log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks - = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); + auto packed_read_tasks = packSegmentReadTasks( + db_context, + std::move(read_tasks), + column_defines, + read_queue, + num_streams, + extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); pipeline.streams.reserve(num_streams); @@ -751,10 +763,17 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( DM::SegmentReadTasks && read_tasks, size_t num_streams) { + // Share the read queue among all source ops // TODO: share for partition tables under disagg + auto read_queue = std::make_shared(log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto packed_read_tasks - = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); + auto packed_read_tasks = packSegmentReadTasks( + db_context, + std::move(read_tasks), + column_defines, + read_queue, + num_streams, + extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); SrouceOpBuilder builder{ From 0e7b9da266c6028234e15a0fa3f0db6fecd8c1f5 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 21 Oct 2025 01:26:45 +0800 Subject: [PATCH 2/4] Rename SharedBlockQueue -> ActiveSegmentReadTaskQueue Add active segment limit Signed-off-by: JaySon-Huang --- .../Coprocessor/DAGStorageInterpreter.cpp | 9 +- .../Executor/PipelineExecutorContext.cpp | 1 + .../Storages/DeltaMerge/DeltaMergeStore.cpp | 8 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 8 +- .../ReadThread/ActiveSegmentReadTaskQueue.h | 310 ++++++++++++++++++ .../ReadThread/SegmentReadTaskScheduler.cpp | 18 +- .../ReadThread/SegmentReadTaskScheduler.h | 2 +- .../DeltaMerge/ReadThread/SharedBlockQueue.h | 197 ----------- .../tests/gtest_segment_read_task_pool.cpp | 5 +- .../DeltaMerge/SegmentReadTaskPool.cpp | 84 +++-- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 22 +- .../DeltaMerge/workload/DTWorkload.cpp | 4 +- dbms/src/Storages/SelectQueryInfo.h | 6 +- dbms/src/Storages/StorageDeltaMerge.cpp | 8 +- dbms/src/Storages/StorageDisaggregated.h | 4 +- .../Storages/StorageDisaggregatedRemote.cpp | 12 +- 16 files changed, 421 insertions(+), 277 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h delete mode 100644 dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 5e32f502174..8316f615a1c 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -43,7 +43,7 @@ #include #include #include -#include +#include #include #include #include @@ -919,7 +919,8 @@ std::unordered_map DAGStorageInterpreter::generateSele { std::unordered_map ret; bool use_unordered_concat = context.getSettingsRef().dt_enable_unordered_concat; - auto shared_read_queue = std::make_shared(log); + // Shared read queue for all physical tables + auto shared_read_queue = std::make_shared(max_streams, log); auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo { SelectQueryInfo query_info; /// to avoid null point exception @@ -943,7 +944,9 @@ std::unordered_map DAGStorageInterpreter::generateSele } else { - query_info.read_queue = std::make_shared(Logger::get(query_info.req_id)); + // Different read queue for different physical table + query_info.read_queue + = std::make_shared(max_streams, Logger::get(query_info.req_id)); } return query_info; }; diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index a119a040620..f16cc04221a 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -178,6 +178,7 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { + // TODO: Add ActiveSegmentReadTaskQueue here to cancel storage layer read tasks cancelSharedQueues(); cancelOneTimeFutures(); if (likely(dag_context)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index e5da912fd70..c6716c0b643 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1024,7 +1024,7 @@ BlockInputStreams DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1131,7 +1131,7 @@ void DeltaMergeStore::readRaw( const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments, @@ -1276,7 +1276,7 @@ BlockInputStreams DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -1390,7 +1390,7 @@ void DeltaMergeStore::read( const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index e0c8db897c3..b8007a568ca 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -356,7 +356,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -369,7 +369,7 @@ class DeltaMergeStore const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, bool keep_order, const SegmentIdSet & read_segments = {}, @@ -384,7 +384,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, @@ -409,7 +409,7 @@ class DeltaMergeStore const DB::Settings & db_settings, const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, - const SharedBlockQueuePtr & read_queue, + const ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, UInt64 start_ts, const PushDownExecutorPtr & executor, diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h new file mode 100644 index 00000000000..b6366458487 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ActiveSegmentReadTaskQueue.h @@ -0,0 +1,310 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::DM +{ + +struct TableTaskStat +{ + size_t num_read_tasks = 0; + size_t num_active_segs = 0; + size_t num_finished_segs = 0; +}; +} // namespace DB::DM + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::DM::TableTaskStat & s, FormatContext & ctx) const + { + return fmt::format_to( + ctx.out(), + "{{tot:{} act:{} fin:{}}}", + s.num_read_tasks, + s.num_active_segs, + s.num_finished_segs); + } +}; + +namespace DB::DM +{ + +// Statistics of blocks pushed. +// All methods are thread-safe. +class BlockStat +{ +public: + BlockStat() + : pending_count(0) + , pending_bytes(0) + , total_count(0) + , total_bytes(0) + {} + + void push(const Block & blk) + { + pending_count.fetch_add(1, std::memory_order_relaxed); + total_count.fetch_add(1, std::memory_order_relaxed); + + auto b = blk.bytes(); + pending_bytes.fetch_add(b, std::memory_order_relaxed); + total_bytes.fetch_add(b, std::memory_order_relaxed); + + total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); + } + + void pop(const Block & blk) + { + if (likely(blk)) + { + pending_count.fetch_sub(1, std::memory_order_relaxed); + pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); + } + } + + Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } + + Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } + + Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } + + Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } + + Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } + +private: + std::atomic pending_count; + std::atomic pending_bytes; + std::atomic total_count; + std::atomic total_bytes; + std::atomic total_rows; +}; + +// ActiveSegmentReadTaskQueue manages the blocks read from active segments. +// If the instance is shared among multiple SegmentReadTaskPools, the instance +// limits the total number of active segments and the total number of pending blocks +// among all SegmentReadTaskPools. a.k.a among all physical tables. +class ActiveSegmentReadTaskQueue +{ +public: + explicit ActiveSegmentReadTaskQueue(size_t max_streams, LoggerPtr log_) + : // If the queue is too short, only 1 in the extreme case, it may cause the computation thread + // to encounter empty queues frequently, resulting in too much waiting and thread context switching. + // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, + // when `num_streams_` is 1, `block_slot_limit` is at least 2. + block_slot_limit(std::ceil(std::max(1, max_streams) * 1.5)) + // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, + // situations where the computation may be faster and the storage layer may not be able to keep up. + , active_segment_limit(std::max(2, max_streams)) + , log(std::move(log_)) + {} + + ~ActiveSegmentReadTaskQueue() + { + auto [pop_times, pop_empty_times, peak_blocks_in_queue] = q.getStat(); + auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; + auto total_count = blk_stat.totalCount(); + auto total_bytes = blk_stat.totalBytes(); + auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; + auto approx_max_pending_block_bytes = blk_avg_bytes * peak_blocks_in_queue; + auto total_rows = blk_stat.totalRows(); + LOG_INFO( + log, + "ActiveSegmentReadTaskQueue finished. pop={} pop_empty={} pop_empty_ratio={:.3f} " + "active_segment_limit={} peak_active_segments={} " + "block_slot_limit={} peak_blocks_in_queue={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " + "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", + pop_times, + pop_empty_times, + pop_empty_ratio, + active_segment_limit, + peak_active_segments, + block_slot_limit, + peak_blocks_in_queue, + blk_avg_bytes, + approx_max_pending_block_bytes / 1024.0 / 1024.0, + total_count, + total_bytes / 1024.0 / 1024.0, + total_rows, + total_count > 0 ? total_rows / total_count : 0, + total_rows > 0 ? total_bytes / total_rows : 0); + } + + // === table task management === + + // Add a table task. + void addTableTask(TableID table_id, size_t num_read_tasks) + { + std::unique_lock lock(mu); + tables[table_id] = TableTaskStat{ + .num_read_tasks = num_read_tasks, + }; + } + + // Remove a table task without finishing the queue. + void resetTableTask(TableID table_id) + { + std::unique_lock lock(mu); + tables.erase(table_id); + } + + // Remove a table task. If there is no any table task, finish the queue. + void removeTableTask(TableID table_id, UInt64 pool_id) + { + String table_id_remain; + size_t remain_count = 0; + { + std::unique_lock lock(mu); + tables.erase(table_id); + remain_count = tables.size(); + table_id_remain = remain_count > 0 ? fmt::format("{}", tables) : "none"; + } + + if (remain_count == 0) + { + q.finish(); + } + + LOG_INFO( + log, + "all segments are finished, table_id={} pool_id={} remain_count={} remain_table_ids={}", + table_id, + pool_id, + remain_count, + table_id_remain); + } + + // Finish the queue if there is no any table task. + // Must be called after all table tasks are added. + void finishQueueIfEmpty() + { + String table_id_remain; + { + std::unique_lock lock(mu); + if (tables.empty()) + { + q.finish(); + } + table_id_remain = tables.empty() ? "none" : fmt::format("{}", tables); + } + LOG_INFO(log, "finishQueueIfEmpty called, remain_table_ids={}", table_id_remain); + } + + // Finish the queue unconditionally. Used when aborting when exception happens. + void finishQueue() { q.finish(); } + + // === scheduling limit helpers === + + Int64 getFreeBlockSlots() const { return block_slot_limit - blk_stat.pendingCount(); } + Int64 getFreeActiveSegments() const + { + std::unique_lock lock(mu); + return active_segment_limit - static_cast(active_segment_ids.size()); + } + + void addActiveSegment(TableID table_id, const GlobalSegmentID & seg_id) + { + std::unique_lock lock(mu); + active_segment_ids.emplace(seg_id); + peak_active_segments = std::max(peak_active_segments, active_segment_ids.size()); + // increase the counter of active segments for the table_id + auto iter = tables.find(table_id); + RUNTIME_CHECK_MSG( + iter != tables.end(), + "table_id not found from ActiveSegmentReadTaskQueue, table_id={} tables={}", + table_id, + tables); + iter->second.num_active_segs++; + } + + size_t finishActiveSegment(TableID table_id, const GlobalSegmentID & seg_id) + { + std::unique_lock lock(mu); + active_segment_ids.erase(seg_id); + auto iter = tables.find(table_id); + RUNTIME_CHECK_MSG( + iter != tables.end(), + "table_id not found from ActiveSegmentReadTaskQueue, table_id={} tables={}", + table_id, + tables); + RUNTIME_CHECK_MSG( + iter->second.num_active_segs >= 1, + "table_id has invalid active segments number, table_id={} table_stat={} tables={}", + table_id, + iter->second, + tables); + // decrease the counter of active segments for the table_id + --iter->second.num_active_segs; + ++iter->second.num_finished_segs; + return iter->second.num_active_segs; + } + + // === queue operations === + + void pushBlock(Block && block) + { + blk_stat.push(block); + q.push(std::move(block), nullptr); + } + + // Blocking pop + void popBlock(Block & block) + { + q.pop(block); + blk_stat.pop(block); + } + + // Non-blocking pop + bool tryPopBlock(Block & block) + { + if (q.tryPop(block)) + { + blk_stat.pop(block); + return true; + } + else + { + return false; + } + } + + void registerTask(TaskPtr && task) { q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); } + +private: + WorkQueue q; + + const Int64 block_slot_limit; + const Int64 active_segment_limit; + // Statistics of blocks pushed into the queue. + BlockStat blk_stat; + + LoggerPtr log; + + mutable std::mutex mu; + + std::unordered_map tables; + size_t peak_active_segments = 0; + std::unordered_set active_segment_ids; +}; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index e482a541024..9fe2d592db1 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -62,7 +62,12 @@ void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool) assert(pool != nullptr); if (pool->getPendingSegmentCount() <= 0) { - LOG_INFO(pool->getLogger(), "Ignored for no segment to read, pool_id={}", pool->pool_id); + LOG_INFO( + pool->getLogger(), + "Ignored for no segment to read, keyspace={} table_id={} pool_id={}", + pool->keyspace_id, + pool->table_id, + pool->pool_id); return; } Stopwatch sw; @@ -70,7 +75,9 @@ void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool) pending_pools.push_back(pool); LOG_INFO( pool->getLogger(), - "Submitted, pool_id={} segment_count={} pending_pools={} cost={}ns", + "Submitted, keyspace={} table_id={} pool_id={} segment_count={} pending_pools={} cost={}ns", + pool->keyspace_id, + pool->table_id, pool->pool_id, pool->getPendingSegmentCount(), pending_pools.size(), @@ -244,7 +251,12 @@ std::tuple SegmentReadTaskScheduler::scheduleOneRound() // TODO: `weak_ptr` may be more suitable. if (pool.use_count() == 1) { - LOG_INFO(pool->getLogger(), "Erase pool_id={}", pool->pool_id); + LOG_INFO( + pool->getLogger(), + "Erase keyspace={} table_id={} pool_id={}", + pool->keyspace_id, + pool->table_id, + pool->pool_id); ++erased_pool_count; itr = read_pools.erase(itr); continue; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index f51615698a7..361230d4680 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -33,7 +33,7 @@ class SegmentReadTasksPoolTest; // // - `sched_thread` will scheduling read tasks. // - Call path: schedLoop -> schedule -> reapPendingPools -> scheduleOneRound -// - reapPeningPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`. +// - reapPendingPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`. // - scheduleOneRound will scan `read_pools` and choose segments to read. class SegmentReadTaskScheduler { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h deleted file mode 100644 index 6bd56113544..00000000000 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SharedBlockQueue.h +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -namespace DB::DM -{ - -// Statistics of blocks pushed. -// All methods are thread-safe. -class BlockStat -{ -public: - BlockStat() - : pending_count(0) - , pending_bytes(0) - , total_count(0) - , total_bytes(0) - {} - - void push(const Block & blk) - { - pending_count.fetch_add(1, std::memory_order_relaxed); - total_count.fetch_add(1, std::memory_order_relaxed); - - auto b = blk.bytes(); - pending_bytes.fetch_add(b, std::memory_order_relaxed); - total_bytes.fetch_add(b, std::memory_order_relaxed); - - total_rows.fetch_add(blk.rows(), std::memory_order_relaxed); - } - - void pop(const Block & blk) - { - if (likely(blk)) - { - pending_count.fetch_sub(1, std::memory_order_relaxed); - pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); - } - } - - Int64 pendingCount() const { return pending_count.load(std::memory_order_relaxed); } - - Int64 pendingBytes() const { return pending_bytes.load(std::memory_order_relaxed); } - - Int64 totalCount() const { return total_count.load(std::memory_order_relaxed); } - - Int64 totalBytes() const { return total_bytes.load(std::memory_order_relaxed); } - - Int64 totalRows() const { return total_rows.load(std::memory_order_relaxed); } - -private: - std::atomic pending_count; - std::atomic pending_bytes; - std::atomic total_count; - std::atomic total_bytes; - std::atomic total_rows; -}; - -class SharedBlockQueue -{ -public: - explicit SharedBlockQueue(LoggerPtr log_) - : log(std::move(log_)) - {} - - ~SharedBlockQueue() - { - auto [pop_times, pop_empty_times, max_queue_size] = q.getStat(); - auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; - auto total_count = blk_stat.totalCount(); - auto total_bytes = blk_stat.totalBytes(); - auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; - auto approx_max_pending_block_bytes = blk_avg_bytes * max_queue_size; - auto total_rows = blk_stat.totalRows(); - LOG_INFO( - log, - "SharedBlockQueue finished. pop={} pop_empty={} pop_empty_ratio={:.3f} " - "max_queue_size={} blk_avg_bytes={} approx_max_pending_block_bytes={:.2f}MB " - "total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B", - pop_times, - pop_empty_times, - pop_empty_ratio, - max_queue_size, - blk_avg_bytes, - approx_max_pending_block_bytes / 1024.0 / 1024.0, - total_count, - total_bytes / 1024.0 / 1024.0, - total_rows, - total_count > 0 ? total_rows / total_count : 0, - total_rows > 0 ? total_bytes / total_rows : 0); - } - - void addTableTask(TableID table_id) - { - std::unique_lock lock(mu); - tables.emplace(table_id); - } - - void resetTableTask(TableID table_id) - { - std::unique_lock lock(mu); - tables.erase(table_id); - } - void removeTableTask(TableID table_id) - { - String table_id_remain; - size_t remain_count = 0; - { - std::unique_lock lock(mu); - tables.erase(table_id); - remain_count = tables.size(); - table_id_remain = remain_count > 0 ? fmt::format("{}", tables) : "none"; - } - - if (remain_count == 0) - { - q.finish(); - } - - LOG_INFO( - log, - "removeTableTask table_id={} remain_count={} remain_table_ids={}", - table_id, - remain_count, - table_id_remain); - } - - void pushBlock(Block && block) - { - blk_stat.push(block); - q.push(std::move(block), nullptr); - } - void popBlock(Block & block) - { - q.pop(block); - blk_stat.pop(block); - } - bool tryPopBlock(Block & block) - { - if (q.tryPop(block)) - { - blk_stat.pop(block); - return true; - } - else - { - return false; - } - } - void registerTask(TaskPtr && task) { q.registerPipeTask(std::move(task), NotifyType::WAIT_ON_TABLE_SCAN_READ); } - - void finishQueueIfEmpty() - { - String table_id_remain; - { - std::unique_lock lock(mu); - if (tables.empty()) - { - q.finish(); - } - table_id_remain = tables.empty() ? "none" : fmt::format("{}", tables); - } - LOG_INFO(log, "finishQueueIfEmpty called, remain_table_ids={}", table_id_remain); - } - - void finishQueue() { q.finish(); } - - const BlockStat & getBlockStat() const { return blk_stat; } - -private: - WorkQueue q; - - // Statistics of blocks pushed into the queue. - BlockStat blk_stat; - - LoggerPtr log; - - std::mutex mu; - std::unordered_set tables; -}; -using SharedBlockQueuePtr = std::shared_ptr; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp index a1b51f5e8f5..190f6c9571e 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp @@ -92,7 +92,8 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic SegmentReadTaskPoolPtr createSegmentReadTaskPool(const std::vector & seg_ids) { - auto read_queue = std::make_shared(); + size_t num_streams = 1; + auto read_queue = std::make_shared(num_streams); auto dm_context = createDMContext(); return std::make_shared( read_queue, @@ -106,7 +107,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic /*after_segment_read_*/ [&](const DMContextPtr &, const SegmentPtr &) { /*do nothing*/ }, /*tracing_id_*/ String{}, /*enable_read_thread_*/ true, - /*num_streams_*/ 1, + /*num_streams_*/ num_streams, /*keyspace_id_*/ NullspaceID, dm_context->physical_table_id, /*res_group_name_*/ String{}); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 485c36fc4ef..3bc97af51ef 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -83,6 +83,11 @@ bool SegmentReadTasksWrapper::empty() const return ordered_tasks.empty() && unordered_tasks.empty(); } +size_t SegmentReadTasksWrapper::size() const +{ + return ordered_tasks.size() + unordered_tasks.size(); +} + BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { MemoryTrackerSetter setter(true, mem_tracker.get()); @@ -103,15 +108,17 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t t->dm_context->physical_table_id); LOG_DEBUG( log, - "buildInputStream: read_mode={}, pool_id={} segment={}", + "buildInputStream: read_mode={} keyspace={} table_id={} pool_id={} segment={}", magic_enum::enum_name(read_mode), + keyspace_id, + table_id, pool_id, t); return stream; } SegmentReadTaskPool::SegmentReadTaskPool( - const SharedBlockQueuePtr & shared_q_, + const ActiveSegmentReadTaskQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -122,11 +129,13 @@ SegmentReadTaskPool::SegmentReadTaskPool( AfterSegmentRead after_segment_read_, const String & tracing_id, bool enable_read_thread_, - Int64 num_streams_, + Int64 /*num_streams_*/, KeyspaceID keyspace_id_, TableID table_id_, const String & res_group_name_) - : pool_id(nextPoolId()) + : keyspace_id(keyspace_id_) + , table_id(table_id_) + , pool_id(nextPoolId()) , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) , extra_table_id_index(extra_table_id_index_) , columns_to_read(columns_to_read_) @@ -141,49 +150,54 @@ SegmentReadTaskPool::SegmentReadTaskPool( , log(Logger::get(tracing_id)) , unordered_input_stream_ref_count(0) , exception_happened(false) - // If the queue is too short, only 1 in the extreme case, it may cause the computation thread - // to encounter empty queues frequently, resulting in too much waiting and thread context switching. - // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, - // when `num_streams_` is 1, `block_slot_limit` is at least 2. - , block_slot_limit(std::ceil(num_streams_ * 1.5)) - // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, - // situations where the computation may be faster and the storage layer may not be able to keep up. - , active_segment_limit(std::max(num_streams_, 2)) - , keyspace_id(keyspace_id_) - , table_id(table_id_) , res_group_name(res_group_name_) { RUNTIME_CHECK_MSG( shared_q != nullptr, - "SegmentReadTaskPool requires a valid SharedBlockQueuePtr, tracing_id={}", + "SegmentReadTaskPool requires a valid ActiveSegmentReadTaskQueuePtr, tracing_id={}", tracing_id); if (!tasks_wrapper.empty()) { - shared_q->addTableTask(table_id); + shared_q->addTableTask(table_id, total_read_tasks); } } SegmentReadTaskPool::~SegmentReadTaskPool() { - LOG_DEBUG(log, "SegmentReadTaskPool finished. table_id={} pool_id={}", table_id, pool_id); + LOG_DEBUG(log, "SegmentReadTaskPool finished. keyspace={} table_id={} pool_id={}", keyspace_id, table_id, pool_id); } void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg) { after_segment_read(seg->dm_context, seg->segment); bool pool_finished = false; + size_t num_act_segs = shared_q->finishActiveSegment(table_id, seg->getGlobalSegmentID()); + size_t num_task_left = 0; { std::lock_guard lock(mutex); - active_segment_ids.erase(seg->getGlobalSegmentID()); - pool_finished = active_segment_ids.empty() && tasks_wrapper.empty(); + num_task_left = tasks_wrapper.size(); + pool_finished = num_act_segs == 0 && tasks_wrapper.empty(); } - LOG_DEBUG(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished); + LOG_DEBUG( + log, + "finishSegment pool_id={} segment={} num_act_segs={} num_task_left={} pool_finished={}", + pool_id, + seg, + num_act_segs, + num_task_left, + pool_finished); + if (pool_finished) { // notify the queue this table_id is finished only when all segments are finished. - shared_q->removeTableTask(table_id); - LOG_INFO(log, "SegmentReadTaskPool all segment are finished, table_id={} pool_id={}", table_id, pool_id); + shared_q->removeTableTask(table_id, pool_id); + LOG_DEBUG( + log, + "SegmentReadTaskPool all segments are finished, keyspace={} table_id={} pool_id={}", + keyspace_id, + table_id, + pool_id); } } @@ -195,10 +209,13 @@ SegmentReadTaskPtr SegmentReadTaskPool::nextTask() SegmentReadTaskPtr SegmentReadTaskPool::getTask(const GlobalSegmentID & seg_id) { - std::lock_guard lock(mutex); - auto t = tasks_wrapper.getTask(seg_id); - RUNTIME_CHECK(t != nullptr, pool_id, seg_id); - active_segment_ids.insert(seg_id); + SegmentReadTaskPtr t; + { + std::lock_guard lock(mutex); + t = tasks_wrapper.getTask(seg_id); + RUNTIME_CHECK(t != nullptr, pool_id, seg_id); + } + shared_q->addActiveSegment(table_id, seg_id); return t; } @@ -241,11 +258,12 @@ MergingSegments::iterator SegmentReadTaskPool::scheduleSegment( bool enable_data_sharing) { auto target = segments.end(); - std::lock_guard lock(mutex); - if (getFreeActiveSegmentsUnlock() <= 0) + if (getFreeActiveSegments() <= 0) { return target; } + + std::lock_guard lock(mutex); static constexpr int max_iter_count = 32; int iter_count = 0; const auto & tasks = tasks_wrapper.getTasks(); @@ -333,18 +351,12 @@ Int64 SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount() Int64 SegmentReadTaskPool::getFreeBlockSlots() const { - return block_slot_limit - shared_q->getBlockStat().pendingCount(); + return shared_q->getFreeBlockSlots(); } Int64 SegmentReadTaskPool::getFreeActiveSegments() const { - std::lock_guard lock(mutex); - return getFreeActiveSegmentsUnlock(); -} - -Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const -{ - return active_segment_limit - static_cast(active_segment_ids.size()); + return shared_q->getFreeActiveSegments(); } Int64 SegmentReadTaskPool::getPendingSegmentCount() const diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 593c9172044..9945931011f 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -50,6 +50,8 @@ class SegmentReadTasksWrapper bool empty() const; + size_t size() const; + private: bool enable_read_thread; SegmentReadTasks ordered_tasks; @@ -63,7 +65,7 @@ class SegmentReadTaskPool { public: SegmentReadTaskPool( - const SharedBlockQueuePtr & shared_q_, + const ActiveSegmentReadTaskQueuePtr & shared_q_, int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownExecutorPtr & executor_, @@ -121,7 +123,9 @@ class SegmentReadTaskPool size_t getTotalReadTasks() const { return total_read_tasks; } public: - const uint64_t pool_id; + const KeyspaceID keyspace_id; + const TableID table_id; + const UInt64 pool_id; // The memory tracker of MPPTask. const MemoryTrackerPtr mem_tracker; @@ -148,7 +152,6 @@ class SegmentReadTaskPool const LoggerPtr & getLogger() const { return log; } private: - Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; void finishSegment(const SegmentReadTaskPtr & seg); void pushBlock(Block && block); @@ -165,10 +168,8 @@ class SegmentReadTaskPool SegmentReadTasksWrapper tasks_wrapper; AfterSegmentRead after_segment_read; mutable std::mutex mutex; - std::unordered_set active_segment_ids; - // WorkQueue q; - // BlockStat blk_stat; - SharedBlockQueuePtr shared_q; + // std::unordered_set active_segment_ids; + ActiveSegmentReadTaskQueuePtr shared_q; LoggerPtr log; std::atomic unordered_input_stream_ref_count; @@ -182,11 +183,6 @@ class SegmentReadTaskPool // std::once_flag and std::call_once to prevent duplicated add. std::once_flag add_to_scheduler; - const Int64 block_slot_limit; - const Int64 active_segment_limit; - - const KeyspaceID keyspace_id; - const TableID table_id; const String res_group_name; std::mutex ru_mu; std::atomic last_time_check_ru = 0; diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index 140ee6a3dd2..198fe411ab4 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -189,7 +189,7 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); - SharedBlockQueuePtr read_queue = std::make_shared(Logger::get()); + auto read_queue = std::make_shared(stream_count, Logger::get()); auto streams = store->read( *context, context->getSettingsRef(), @@ -207,6 +207,8 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) .is_fast_scan = opts->is_fast_scan, }, excepted_block_size); + + read_queue->finishQueueIfEmpty(); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index 7e1ac3b1614..6d7f22f796f 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -37,8 +37,8 @@ struct DAGQueryInfo; namespace DM { -class SharedBlockQueue; -using SharedBlockQueuePtr = std::shared_ptr; +class ActiveSegmentReadTaskQueue; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; } // namespace DM /** Query along with some additional data, @@ -62,7 +62,7 @@ struct SelectQueryInfo bool is_fast_scan = false; bool has_multiple_partitions = false; - DM::SharedBlockQueuePtr read_queue; + DM::ActiveSegmentReadTaskQueuePtr read_queue; SelectQueryInfo(); ~SelectQueryInfo(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 03614e7c72d..55e14927e41 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1147,7 +1147,7 @@ UInt64 StorageDeltaMerge::onSyncGc(Int64 limit, const GCOptions & gc_options) // just for testing size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { - SharedBlockQueuePtr read_queue = std::make_shared(Logger::get("getRows")); + auto read_queue = std::make_shared(2, Logger::get("getRows")); ColumnDefines to_read{getExtraHandleColumnDefine(store->isCommonHandle())}; auto stream = store->read( @@ -1163,6 +1163,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM 0, /*tracing_id*/ "getRows", DMReadOptions{})[0]; + + read_queue->finishQueueIfEmpty(); stream->readPrefix(); Block block; size_t rows = 0; @@ -1176,7 +1178,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM // just for testing DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { - SharedBlockQueuePtr read_queue = std::make_shared(Logger::get("getRange")); + auto read_queue = std::make_shared(2, Logger::get("getRange")); auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()); @@ -1195,6 +1197,8 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context 0, /*tracing_id*/ "getRange", DMReadOptions{})[0]; + + read_queue->finishQueueIfEmpty(); stream->readPrefix(); Block block; size_t index = 0; diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 5ed5d02672b..b22e018cbc4 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -112,7 +112,7 @@ class StorageDisaggregated : public IStorage const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, - DM::SharedBlockQueuePtr & read_queue, + DM::ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, int extra_table_id_index); void buildRemoteSegmentInputStreams( diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 34d07a2cf56..92e4458f7c7 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -38,7 +38,7 @@ #include #include #include -#include +#include #include #include #include @@ -569,7 +569,7 @@ std::variant StorageDisagg const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, - DM::SharedBlockQueuePtr & read_queue, + DM::ActiveSegmentReadTaskQueuePtr & read_queue, size_t num_streams, int extra_table_id_index) { @@ -691,7 +691,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( DAGPipeline & pipeline) { // Share the read queue among all inputstreams // TODO: share for partition tables under disagg - auto read_queue = std::make_shared(log); + auto read_queue = std::make_shared(num_streams, log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); auto packed_read_tasks = packSegmentReadTasks( @@ -724,7 +724,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( }); } -struct SrouceOpBuilder +struct SourceOpBuilder { const String & tracing_id; const DM::ColumnDefinesPtr & column_defines; @@ -764,7 +764,7 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( size_t num_streams) { // Share the read queue among all source ops // TODO: share for partition tables under disagg - auto read_queue = std::make_shared(log); + auto read_queue = std::make_shared(num_streams, log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); auto packed_read_tasks = packSegmentReadTasks( @@ -776,7 +776,7 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); - SrouceOpBuilder builder{ + SourceOpBuilder builder{ .tracing_id = log->identifier(), .column_defines = column_defines, .extra_table_id_index = extra_table_id_index, From 3df6b91012e9c2bfec0036a1ceb523aec11cf210 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 21 Oct 2025 03:43:17 +0800 Subject: [PATCH 3/4] Support cancel on ActiveSegmentReadTaskQueue Signed-off-by: JaySon-Huang --- .../Coprocessor/DAGStorageInterpreter.cpp | 2 ++ .../Executor/PipelineExecutorContext.cpp | 22 ++++++++++++++++++- .../Flash/Executor/PipelineExecutorContext.h | 13 +++++++++++ .../Storages/StorageDisaggregatedRemote.cpp | 11 +++++++--- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 8316f615a1c..a3ecf6d2579 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1382,6 +1382,7 @@ void DAGStorageInterpreter::buildLocalExec( assert(!table_query_infos.empty()); // check at start of this function auto read_queue = table_query_infos.begin()->second.read_queue; read_queue->finishQueueIfEmpty(); + exec_context.addStorageTaskQueue(read_queue); } else { @@ -1389,6 +1390,7 @@ void DAGStorageInterpreter::buildLocalExec( { const SelectQueryInfo & query_info = table_query_info.second; query_info.read_queue->finishQueueIfEmpty(); + exec_context.addStorageTaskQueue(query_info.read_queue); } } diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index f16cc04221a..1a964bf8d17 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -178,7 +179,7 @@ void PipelineExecutorContext::cancel() bool origin_value = false; if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { - // TODO: Add ActiveSegmentReadTaskQueue here to cancel storage layer read tasks + cancelStorageTaskQueues(); cancelSharedQueues(); cancelOneTimeFutures(); if (likely(dag_context)) @@ -235,6 +236,25 @@ void PipelineExecutorContext::cancelSharedQueues() shared_queue->cancel(); } +void PipelineExecutorContext::addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue) +{ + std::lock_guard lock(mu); + RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled."); + assert(storage_task_queue); + storage_task_queues.push_back(storage_task_queue); +} + +void PipelineExecutorContext::cancelStorageTaskQueues() +{ + std::vector tmp; + { + std::lock_guard lock(mu); + std::swap(tmp, storage_task_queues); + } + for (const auto & storage_task_queue : tmp) + storage_task_queue->finishQueue(); +} + void PipelineExecutorContext::addOneTimeFuture(const OneTimeNotifyFuturePtr & future) { std::lock_guard lock(mu); diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index 97fba40e889..ad1723694c2 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -40,6 +40,13 @@ class OneTimeNotifyFuture; using OneTimeNotifyFuturePtr = std::shared_ptr; class DAGContext; +namespace DM +{ +class ActiveSegmentReadTaskQueue; +using ActiveSegmentReadTaskQueuePtr = std::shared_ptr; +} // namespace DM + + class PipelineExecutorContext : private boost::noncopyable { public: @@ -146,6 +153,8 @@ class PipelineExecutorContext : private boost::noncopyable void addOneTimeFuture(const OneTimeNotifyFuturePtr & future); + void addStorageTaskQueue(const DM::ActiveSegmentReadTaskQueuePtr & storage_task_queue); + private: bool setExceptionPtr(const std::exception_ptr & exception_ptr_); @@ -160,6 +169,8 @@ class PipelineExecutorContext : private boost::noncopyable void cancelOneTimeFutures(); + void cancelStorageTaskQueues(); + void cancelResultQueueIfNeed(); private: @@ -196,5 +207,7 @@ class PipelineExecutorContext : private boost::noncopyable std::vector shared_queues; std::vector one_time_futures; + + std::vector storage_task_queues; }; } // namespace DB diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 92e4458f7c7..b3499ae1643 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -620,7 +620,9 @@ std::variant StorageDisagg if (enable_read_thread) { - TableID table_id = table_scan.getLogicalTableID(); // FIXME: + // Now for StorageDisaggregated, we create only one SegmentReadTaskPool for segments from all partitions. + // Use the logical table id as a workaround. + TableID table_id = table_scan.getLogicalTableID(); return std::make_shared( read_queue, extra_table_id_index, @@ -690,7 +692,8 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( size_t num_streams, DAGPipeline & pipeline) { - // Share the read queue among all inputstreams // TODO: share for partition tables under disagg + // Share the read queue among all inputstreams. Note that for StorageDisaggregated, + // now we create only one SegmentReadTaskPool for segment from all partitions. auto read_queue = std::make_shared(num_streams, log); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); @@ -763,8 +766,10 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( DM::SegmentReadTasks && read_tasks, size_t num_streams) { - // Share the read queue among all source ops // TODO: share for partition tables under disagg + // Share the read queue among all source ops. Note that for StorageDisaggregated, + // now we create only one SegmentReadTaskPool for segment from all partitions. auto read_queue = std::make_shared(num_streams, log); + exec_context.addStorageTaskQueue(read_queue); // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); auto packed_read_tasks = packSegmentReadTasks( From 60df8c86d306abdf522e110136b8c204faa1df52 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 23 Oct 2025 17:27:56 +0800 Subject: [PATCH 4/4] fix ut Signed-off-by: JaySon-Huang --- .../Operators/tests/gtest_concat_source.cpp | 2 +- .../tests/gtest_dm_vector_index_utils.h | 3 ++ .../Index/tests/gtest_dm_minmax_index.cpp | 3 ++ .../tests/gtest_segment_read_task_pool.cpp | 11 ++-- ...est_dm_delta_merge_store_fast_add_peer.cpp | 3 ++ ...est_dm_delta_merge_store_for_fast_scan.cpp | 51 +++++++++++++++++++ .../DeltaMerge/tests/gtest_dm_ingest.cpp | 3 ++ .../tests/gtest_kvstore_fast_add_peer.cpp | 3 ++ 8 files changed, 74 insertions(+), 5 deletions(-) diff --git a/dbms/src/Operators/tests/gtest_concat_source.cpp b/dbms/src/Operators/tests/gtest_concat_source.cpp index 230266e312f..7f0d577b34c 100644 --- a/dbms/src/Operators/tests/gtest_concat_source.cpp +++ b/dbms/src/Operators/tests/gtest_concat_source.cpp @@ -387,7 +387,7 @@ try } std::atomic received_blocks = 0; - received_blocks.fetch_add(1, std::memory_order_relaxed); + ResultHandler h([&](const Block & /*block*/) { received_blocks.fetch_add(1, std::memory_order_relaxed); }); PipelineExecGroupBuilder result_builder; builder_pool.generate(result_builder, exec_context, "test"); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h index 472eca8f02b..ee411df41d2 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index_utils.h @@ -200,11 +200,13 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils void read(const RowKeyRange & range, const PushDownExecutorPtr & executor, const ColumnWithTypeAndName & out) { + auto read_queue = std::make_shared(2, Logger::get("read")); auto in = store->read( *db_context, db_context->getSettingsRef(), {cdVec()}, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), executor, @@ -212,6 +214,7 @@ class DeltaMergeStoreVectorBase : public VectorIndexTestUtils 0, TRACING_NAME, DMReadOptions{})[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({vec_column_name}), diff --git a/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp index 4aec61395ce..aceeb8ba004 100644 --- a/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/tests/gtest_dm_minmax_index.cpp @@ -153,11 +153,13 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; + auto read_queue = std::make_shared(2, Logger::get("read")); auto streams = store->read( context, context.getSettingsRef(), {col_to_read}, {all_range}, + read_queue, 1, std::numeric_limits::max(), std::make_shared(filter), @@ -165,6 +167,7 @@ bool checkMatch( 0, name, DMReadOptions{}); + read_queue->finishQueueIfEmpty(); auto rows = getInputStreamNRows(streams[0]); store->drop(); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp index 190f6c9571e..46d720a2bc9 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/tests/gtest_segment_read_task_pool.cpp @@ -126,7 +126,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic // Submit to pending_pools scheduler.add(pool); { - std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings ASSERT_EQ(scheduler.pending_pools.size(), 1); } ASSERT_EQ(scheduler.read_pools.size(), 0); @@ -134,7 +134,7 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic // Reap the pending_pools scheduler.reapPendingPools(); { - std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnings ASSERT_EQ(scheduler.pending_pools.size(), 0); } ASSERT_EQ(scheduler.read_pools.size(), 1); @@ -199,9 +199,12 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic pool->finishSegment(merged_task->units.front().task); } - ASSERT_EQ(pool->q.size(), 0); Block blk; - ASSERT_FALSE(pool->q.pop(blk)); + // popBlock can only return empty block + pool->shared_q->popBlock(blk); + ASSERT_FALSE(blk); + pool->shared_q->tryPopBlock(blk); + ASSERT_FALSE(blk); pool->decreaseUnorderedInputStreamRefCount(); ASSERT_FALSE(pool->valid()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 3736c8dfdc0..defeb5466bb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -254,11 +254,13 @@ class DeltaMergeStoreTestFastAddPeer void verifyRows(const RowKeyRange & range, size_t rows) { const auto & columns = store->getTableColumns(); + auto read_queue = std::make_shared(2, Logger::get("verifyRows")); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -267,6 +269,7 @@ class DeltaMergeStoreTestFastAddPeer TRACING_NAME, DMReadOptions{}, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, rows); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp index de96a94dc20..7ef93f5915c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp @@ -73,12 +73,14 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithoutRangeFilter) { // read all columns from store with all range in fast mode + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -89,6 +91,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithoutRangeFilter) .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name, col_str_define.name, col_i8_define.name}), @@ -161,11 +164,13 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithRangeFilter) RowKeyValue(false, std::make_shared(end_key_ss.releaseStr()), /*int_val_*/ read_nums_limit), false, store->getRowKeyColumnSize())}; + auto read_queue = std::make_shared(2, Logger::get()); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, key_ranges, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -176,6 +181,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithRangeFilter) .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name, col_str_define.name, col_i8_define.name}), @@ -241,12 +247,14 @@ try } { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -257,6 +265,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -376,12 +385,14 @@ try } { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -392,6 +403,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -490,12 +502,14 @@ try store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -506,6 +520,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { case TestMode::PageStorageV2_MemoryOnly: @@ -607,12 +622,14 @@ try store->mergeDeltaAll(*db_context); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -623,6 +640,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -692,12 +710,14 @@ try store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -708,6 +728,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { @@ -864,12 +885,14 @@ try // Read after deletion { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -880,6 +903,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // filter del mark = 1, thus just read the insert data before delete ASSERT_INPUTSTREAM_COLS_UR( in, @@ -894,12 +918,14 @@ try store->mergeDeltaAll(*db_context); { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -910,6 +936,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -943,12 +970,14 @@ try } // Test Reading first { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -959,6 +988,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -972,12 +1002,14 @@ try } // Read after deletion { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -988,6 +1020,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // filter del mark = 1, thus just read the insert data before delete ASSERT_INPUTSTREAM_COLS_UR( in, @@ -1035,12 +1068,14 @@ try // Read after merge delta { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1051,6 +1086,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); auto pk_coldata = createNumbers(num_deleted_rows, num_rows_write); ASSERT_EQ(pk_coldata.size(), num_rows_write - num_deleted_rows); ASSERT_INPUTSTREAM_COLS_UR(in, Strings({DMTestEnv::pk_name}), createColumns({createColumn(pk_coldata)})); @@ -1123,12 +1159,14 @@ try // Read in fast mode { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1139,6 +1177,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); switch (mode) { @@ -1218,12 +1257,14 @@ try // Read with version in normal case { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ static_cast(1), EMPTY_FILTER, @@ -1235,6 +1276,7 @@ try .is_fast_scan = false, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); // Data is not guaranteed to be returned in order. ASSERT_UNORDERED_INPUTSTREAM_COLS_UR( in, @@ -1276,12 +1318,14 @@ try // could do clean read with no optimization { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1292,6 +1336,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -1325,11 +1370,13 @@ try } } + auto read_queue = std::make_shared(2, Logger::get()); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), real_columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1340,6 +1387,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write - num_deleted_rows); } } @@ -1406,12 +1454,14 @@ try store->mergeDeltaAll(*db_context); auto fastscan_rows = [&]() { + auto read_queue = std::make_shared(2, Logger::get()); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( *db_context, db_context->getSettingsRef(), columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -1422,6 +1472,7 @@ try .is_fast_scan = true, }, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); size_t rows = 0; in->readPrefix(); while (true) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp index 6fbb41477ff..dd306c438f9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp @@ -246,11 +246,13 @@ try for (int i = 0; i < upper_bound; ++i) if (filled_bitmap[i]) expected_pk_column.emplace_back(i); + auto read_queue = std::make_shared(2, Logger::get("read")); auto stream = store->read( *db_context, db_context->getSettingsRef(), store->getTableColumns(), {RowKeyRange::newAll(is_common_handle, 1)}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, @@ -260,6 +262,7 @@ try DMReadOptions{ .keep_order = true, })[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name}), diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index d33d0fbaff5..25da4417fcf 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -340,12 +340,14 @@ CATCH void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRange & range, size_t rows) { + auto read_queue = std::make_shared(2, Logger::get("verifyRows")); const auto & columns = store->getTableColumns(); BlockInputStreamPtr in = store->read( ctx, ctx.getSettingsRef(), columns, {range}, + read_queue, /* num_streams= */ 1, /* start_ts= */ std::numeric_limits::max(), DM::EMPTY_FILTER, @@ -354,6 +356,7 @@ void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRan "KVStoreFastAddPeer", DM::DMReadOptions{}, /* expected_block_size= */ 1024)[0]; + read_queue->finishQueueIfEmpty(); ASSERT_INPUTSTREAM_NROWS(in, rows); }