From 674240d41f340a716fa69688bcd3fb416cdebf20 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 14:35:16 +0800 Subject: [PATCH 01/22] Revert "Support graceful shutdown in TiFlash (#10267)" This reverts commit bec1390d8be06e97612404a61867f0b38b3627b2. --- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 1 - dbms/src/Flash/Mpp/MPPTaskManager.cpp | 25 -------------- dbms/src/Flash/Mpp/MPPTaskManager.h | 2 -- dbms/src/Server/FlashGrpcServerHolder.cpp | 13 +++----- dbms/src/Server/Server.cpp | 23 +++---------- dbms/src/Storages/KVStore/ProxyStateMachine.h | 33 +++++++++++-------- 6 files changed, 29 insertions(+), 68 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 844c31907d6..45f73e84b57 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1c922893c32..3296da53c37 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -83,31 +83,6 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) -{ - auto start = std::chrono::steady_clock::now(); - // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down - static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60); - auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); - while (true) - { - // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - { - std::unique_lock lock(mu); - if (monitored_tasks.empty()) - break; - } - auto current_time = std::chrono::steady_clock::now(); - if (current_time >= max_wait_time) - { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown); - break; - } - } -} - MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index d0b7c97ed1a..f49d14600ab 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,8 +194,6 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - void waitAllMPPTasksFinish(const std::unique_ptr & global_context); - std::mutex mu; std::condition_variable cv; bool is_shutdown = false; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index ef5eb9bf180..95fd21ac9aa 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -224,20 +224,17 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() *is_shutdown = true; // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - constexpr int wait_step = 200; - // Maximum wait for 1 minute - constexpr int max_wait_cnt = 60 * 1000 / wait_step; + const int max_wait_cnt = 300; int wait_cnt = 0; while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::milliseconds(wait_step)); + std::this_thread::sleep_for(std::chrono::seconds(1)); if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) LOG_WARNING( log, - "Wait {} milliseconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource " - "leak", - wait_cnt * wait_step); + "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", + wait_cnt); else - LOG_INFO(log, "Wait {} milliseconds for mpp tunnels shutdown, all finished", wait_cnt * wait_step); + LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); for (auto & cq : cqs) cq->Shutdown(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 691c296685a..3e665946980 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1166,6 +1166,8 @@ try } } + SCOPE_EXIT({ proxy_machine.stopProxy(tmt_context); }); + { // Report the unix timestamp, git hash, release version Poco::Timestamp ts; @@ -1223,8 +1225,7 @@ try } /// startup grpc server to serve raft and/or flash services. - auto flash_grpc_server_holder - = std::make_unique(this->context(), this->config(), raft_config, log); + FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log); SCOPE_EXIT({ // Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed. @@ -1236,6 +1237,8 @@ try LocalAdmissionController::global_instance->safeStop(); }); + proxy_machine.runKVStore(tmt_context); + try { // Bind CPU affinity after all threads started. @@ -1246,25 +1249,9 @@ try LOG_ERROR(log, "CPUAffinityManager::bindThreadCPUAffinity throws exception."); } - // Ready to provide service - tmt_context.setStatusRunning(); - LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); - LOG_INFO(log, "Set store context status Stopping"); - tmt_context.setStatusStopping(); - - tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); - proxy_machine.waitAllReadIndexTasksFinish(tmt_context); - - LOG_INFO(log, "Set store context status Terminated"); - tmt_context.setStatusTerminated(); - - // Stop grpc server before proxy_machine because it depends on a DiagnosticsService which will call proxy - flash_grpc_server_holder.reset(); - proxy_machine.stopProxy(tmt_context); - { // Set limiters stopping and wakeup threads in waitting queue. global_context->getIORateLimiter().setStop(); diff --git a/dbms/src/Storages/KVStore/ProxyStateMachine.h b/dbms/src/Storages/KVStore/ProxyStateMachine.h index 3761abc3858..c186a1c4368 100644 --- a/dbms/src/Storages/KVStore/ProxyStateMachine.h +++ b/dbms/src/Storages/KVStore/ProxyStateMachine.h @@ -367,28 +367,33 @@ struct ProxyStateMachine } } - /// Wait for all read index tasks to finish. - void waitAllReadIndexTasksFinish(TMTContext & tmt_context) - { - if (!proxy_conf.isProxyRunnable()) - return; - if (tiflash_instance_wrap.status != EngineStoreServerStatus::Running) - { - LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); - exit(-1); - } - // Wait until there is no read-index task. - while (tmt_context.getKVStore()->getReadIndexEvent()) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } + // Set KVStore to running, so that it could handle read index requests. + void runKVStore(TMTContext & tmt_context) const { tmt_context.setStatusRunning(); } /// Stop all services in TMTContext and ReadIndexWorkers. /// Then, inform proxy to stop by setting `tiflash_instance_wrap.status`. void stopProxy(TMTContext & tmt_context) { if (!proxy_conf.isProxyRunnable()) + { + tmt_context.setStatusTerminated(); return; + } + if (proxy_conf.isProxyRunnable() && tiflash_instance_wrap.status != EngineStoreServerStatus::Running) + { + LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); + exit(-1); + } + LOG_INFO(log, "Set store context status Stopping"); + tmt_context.setStatusStopping(); + { + // Wait until there is no read-index task. + while (tmt_context.getKVStore()->getReadIndexEvent()) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + tmt_context.setStatusTerminated(); tmt_context.getKVStore()->stopReadIndexWorkers(); + LOG_INFO(log, "Set store context status Terminated"); { // update status and let proxy stop all services except encryption. tiflash_instance_wrap.status = EngineStoreServerStatus::Stopping; From 70bafa0256a6cc3949ab26acc4deb64d3d8b7d5f Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 14:49:17 +0800 Subject: [PATCH 02/22] graceful shutdown Signed-off-by: gengliqi --- .../src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 2 -- dbms/src/Flash/FlashService.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskManager.h | 7 +++++++ dbms/src/Server/FlashGrpcServerHolder.cpp | 13 ++++++++----- dbms/src/Server/Server.cpp | 4 ++++ 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index a7c42db63a6..6bd74396863 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -891,8 +891,6 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (context.getDAGContext()->is_disaggregated_task) throw; - if (tmt.checkShuttingDown()) - throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 07af1390649..275670f9c1a 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -564,7 +564,7 @@ grpc::Status FlashService::IsAlive( return check_result; auto & tmt_context = context->getTMTContext(); - response->set_available(tmt_context.checkRunning()); + response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable()); response->set_mpp_version(DB::GetMppVersion()); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index f49d14600ab..35fb030f7e9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,6 +194,8 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } + void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; @@ -221,6 +223,8 @@ class MPPTaskManager : private boost::noncopyable std::shared_ptr monitor; + std::atomic is_available{true}; + public: explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler); @@ -273,6 +277,9 @@ class MPPTaskManager : private boost::noncopyable bool isTaskExists(const MPPTaskId & id); + void setUnavailable() { is_available = false; } + bool isAvailable() { return is_available; } + private: MPPQueryPtr addMPPQuery( const MPPQueryId & query_id, diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 95fd21ac9aa..d37fb60c890 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -224,17 +224,20 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() *is_shutdown = true; // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - const int max_wait_cnt = 300; + constexpr int wait_step = 200; + // Maximum wait for 3 minute + constexpr int max_wait_cnt = 180 * 1000 / wait_step; int wait_cnt = 0; while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(wait_step)); if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) LOG_WARNING( log, - "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", - wait_cnt); + "Wait {} milliseconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource " + "leak", + wait_cnt * wait_step); else - LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); + LOG_INFO(log, "Wait {} milliseconds for mpp tunnels shutdown, all finished", wait_cnt * wait_step); for (auto & cq : cqs) cq->Shutdown(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3e665946980..c768fac64a8 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1252,6 +1252,10 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + LOG_INFO(log, "Set unavailble for MPPTask"); + tmt_context.getMPPTaskManager()->setUnavailable(); + tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + { // Set limiters stopping and wakeup threads in waitting queue. global_context->getIORateLimiter().setStop(); From 4f882eb6e544c1e38e59f474f6d8f00dc2273a21 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 14:59:16 +0800 Subject: [PATCH 03/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 3296da53c37..1c922893c32 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -83,6 +83,31 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +{ + auto start = std::chrono::steady_clock::now(); + // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down + static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60); + auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); + while (true) + { + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + { + std::unique_lock lock(mu); + if (monitored_tasks.empty()) + break; + } + auto current_time = std::chrono::steady_clock::now(); + if (current_time >= max_wait_time) + { + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown); + break; + } + } +} + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) From f5c00e7ec11f6cf07a157c14f01395dc0d8042ff Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 17:08:41 +0800 Subject: [PATCH 04/22] update config Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 6 +++++- dbms/src/Server/FlashGrpcServerHolder.cpp | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1c922893c32..a72af83a580 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -88,7 +88,11 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob auto start = std::chrono::steady_clock::now(); // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60); + // The default value of flash.graceful_wait_before_shutdown + static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( + GRACEFUL_WIAT_BEFORE_SHUTDOWN, + DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); while (true) { diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index d37fb60c890..ef5eb9bf180 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -225,8 +225,8 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. constexpr int wait_step = 200; - // Maximum wait for 3 minute - constexpr int max_wait_cnt = 180 * 1000 / wait_step; + // Maximum wait for 1 minute + constexpr int max_wait_cnt = 60 * 1000 / wait_step; int wait_cnt = 0; while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) std::this_thread::sleep_for(std::chrono::milliseconds(wait_step)); From 77d6aa5681ba36640a798ca8913cbc592bb5765f Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 20:42:14 +0800 Subject: [PATCH 05/22] u Signed-off-by: gengliqi --- dbms/src/Server/Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index c768fac64a8..59b569dd99a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1252,7 +1252,7 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); - LOG_INFO(log, "Set unavailble for MPPTask"); + LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); From 1d1d0ba59816340cdcc03053b230c27bda1ff957 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 11 Jul 2025 23:42:46 +0800 Subject: [PATCH 06/22] address comments Signed-off-by: gengliqi --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 3 +++ dbms/src/Flash/Mpp/MPPTaskManager.cpp | 1 + 2 files changed, 4 insertions(+) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 6bd74396863..9795778a669 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -891,6 +891,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (context.getDAGContext()->is_disaggregated_task) throw; + if (tmt.checkShuttingDown()) + throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); + // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index a72af83a580..d0519bfdcc8 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -93,6 +93,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); while (true) { From 66b128fb89eefa9324b314751ddd40abe905f7f8 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 14 Jul 2025 17:02:43 +0800 Subject: [PATCH 07/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index d51ba27fecd..00bb2288451 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -893,7 +893,6 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (tmt.checkShuttingDown()) throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); - // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); From 0393bc7a56101855850641249acc624a47cd6726 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 14 Jul 2025 17:32:35 +0800 Subject: [PATCH 08/22] address comments Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.h | 2 ++ dbms/src/Server/Server.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 267593f6e94..35fb030f7e9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,6 +194,8 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } + void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 59b569dd99a..cb4d70f5d15 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1252,6 +1252,8 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + // Note: `waitAllMPPTasksFinish` must be called before stopping the proxy. + // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); From fcf04e64a2cdea6e166a077eb2a1015836063e4f Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 15 Jul 2025 11:05:44 +0800 Subject: [PATCH 09/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index d0519bfdcc8..a3854094317 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -95,6 +96,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); + Stopwatch watch; while (true) { // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched @@ -102,12 +104,15 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob { std::unique_lock lock(mu); if (monitored_tasks.empty()) + { + LOG_INFO(log, "All MPPTasks have finished after {}ms", watch.elapsedMilliseconds()); break; + } } auto current_time = std::chrono::steady_clock::now(); if (current_time >= max_wait_time) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown); + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", watch.elapsedMilliseconds()); break; } } From 80e035d485b207e998395f85c8f7ffba6c896cf6 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 15 Jul 2025 11:39:57 +0800 Subject: [PATCH 10/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index a3854094317..bcf9c35a366 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -86,7 +86,6 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) { - auto start = std::chrono::steady_clock::now(); // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown @@ -95,26 +94,26 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); - auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); Stopwatch watch; + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::seconds(1)); while (true) { - // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + auto elapsed_ms = watch.elapsedMilliseconds(); { std::unique_lock lock(mu); if (monitored_tasks.empty()) { - LOG_INFO(log, "All MPPTasks have finished after {}ms", watch.elapsedMilliseconds()); + LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); break; } } - auto current_time = std::chrono::steady_clock::now(); - if (current_time >= max_wait_time) + if (elapsed_ms >= graceful_wait_before_shutdown * 1000) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", watch.elapsedMilliseconds()); + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); break; } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } From bf0e3a5cad6ca7090319692d765fed808e96d1d9 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 18 Jul 2025 11:58:45 +0800 Subject: [PATCH 11/22] delete Signed-off-by: gengliqi --- dbms/src/Server/FlashGrpcServerHolder.cpp | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 95fd21ac9aa..31c2f02b19c 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -222,19 +222,6 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() LOG_INFO(log, "Begin to shut down flash grpc server"); flash_grpc_server->Shutdown(); *is_shutdown = true; - // Wait all existed MPPTunnels done to prevent crash. - // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - const int max_wait_cnt = 300; - int wait_cnt = 0; - while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::seconds(1)); - if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) - LOG_WARNING( - log, - "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", - wait_cnt); - else - LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); for (auto & cq : cqs) cq->Shutdown(); From 405f04a4257330ef9afedfa422fba14cefa7fcef Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 21 Jul 2025 22:15:55 +0800 Subject: [PATCH 12/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 10 ++++++---- dbms/src/Flash/Mpp/MPPTaskManager.h | 2 +- dbms/src/Server/FlashGrpcServerHolder.cpp | 8 ++++++-- dbms/src/Server/FlashGrpcServerHolder.h | 7 +++++++ dbms/src/Server/Server.cpp | 4 +++- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bcf9c35a366..bb024c3a9d9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -84,7 +84,7 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; @@ -94,6 +94,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -105,13 +106,14 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob if (monitored_tasks.empty()) { LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); - break; + return graceful_wait_before_shutdown_ms > elapsed_ms ? graceful_wait_before_shutdown_ms - elapsed_ms + : 0; } } - if (elapsed_ms >= graceful_wait_before_shutdown * 1000) + if (elapsed_ms >= graceful_wait_before_shutdown_ms) { LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); - break; + return 0; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 35fb030f7e9..dc67e8d5d56 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,7 +194,7 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + UInt64 waitAllMPPTasksFinish(const std::unique_ptr & global_context); std::mutex mu; std::condition_variable cv; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 31c2f02b19c..fc02b91cb11 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -18,6 +18,8 @@ #include #include +#include + // In order to include grpc::SecureServerCredentials which used in // sslServerCredentialsWithFetcher() // We implement sslServerCredentialsWithFetcher() to set config fetcher @@ -220,7 +222,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() { /// Shut down grpc server. LOG_INFO(log, "Begin to shut down flash grpc server"); - flash_grpc_server->Shutdown(); + Stopwatch watch; + auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(grpc_shutdown_max_wait_ms); + flash_grpc_server->Shutdown(deadline); *is_shutdown = true; for (auto & cq : cqs) @@ -239,7 +243,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() GRPCCompletionQueuePool::global_instance->markShutdown(); GRPCCompletionQueuePool::global_instance = nullptr; - LOG_INFO(log, "Shut down flash grpc server"); + LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds()); /// Close flash service. LOG_INFO(log, "Begin to shut down flash service"); diff --git a/dbms/src/Server/FlashGrpcServerHolder.h b/dbms/src/Server/FlashGrpcServerHolder.h index d7cfeda3864..fc84b1aab44 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.h +++ b/dbms/src/Server/FlashGrpcServerHolder.h @@ -44,6 +44,12 @@ class FlashGrpcServerHolder std::unique_ptr & flashService(); + void setMaxWaitMsDuringGRPCShutdown(UInt64 max_wait_ms) + { + // At least 10 seconds + grpc_shutdown_max_wait_ms = std::max(max_wait_ms, 10 * 1000); + } + private: const LoggerPtr & log; std::shared_ptr> is_shutdown; @@ -56,6 +62,7 @@ class FlashGrpcServerHolder std::vector cq_workers; std::vector notify_cq_workers; CollectProcInfoBackgroundTask background_task; + UInt64 grpc_shutdown_max_wait_ms = 10 * 1000; }; } // namespace DB diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index cb4d70f5d15..efcb787035f 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1256,7 +1256,9 @@ try // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); - tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + UInt64 remaining_wait_time + = tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + flash_grpc_server_holder.setMaxWaitMsDuringGRPCShutdown(remaining_wait_time); { // Set limiters stopping and wakeup threads in waitting queue. From d0771dde37e9ccf5c6ba04691ab82676eccbd304 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 21 Jul 2025 23:37:59 +0800 Subject: [PATCH 13/22] update Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 2 +- dbms/src/Server/FlashGrpcServerHolder.cpp | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bb024c3a9d9..639a3fc9935 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -112,7 +112,7 @@ UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & gl } if (elapsed_ms >= graceful_wait_before_shutdown_ms) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); + LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); return 0; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index fc02b91cb11..f374156583f 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -25,6 +25,7 @@ // We implement sslServerCredentialsWithFetcher() to set config fetcher // to auto reload sslServerCredentials #include "../../contrib/grpc/src/cpp/server/secure_server_credentials.h" +#include "common/logger_useful.h" namespace DB { @@ -223,9 +224,24 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() /// Shut down grpc server. LOG_INFO(log, "Begin to shut down flash grpc server"); Stopwatch watch; - auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(grpc_shutdown_max_wait_ms); - flash_grpc_server->Shutdown(deadline); + while (true) + { + auto elapsed_ms = watch.elapsedMilliseconds(); + if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) + { + LOG_INFO(log, "All grpc connections have finished after {}ms", elapsed_ms); + break; + } + if (elapsed_ms >= grpc_shutdown_max_wait_ms) + { + LOG_WARNING(log, "Timed out waiting for grpc connections to finish after {}ms", elapsed_ms); + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + *is_shutdown = true; + flash_grpc_server->Shutdown(); for (auto & cq : cqs) cq->Shutdown(); From deebd0d52b5d95c381684ad327e8ef9d7bc6cd15 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 01:14:20 +0800 Subject: [PATCH 14/22] u Signed-off-by: gengliqi --- dbms/src/Server/FlashGrpcServerHolder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index f374156583f..f2c5e237ac0 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -55,7 +55,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log) // tells us whether there is any kind of event or cq is shutting down. if (!curcq->Next(&tag, &ok)) { - LOG_INFO(log, "CQ is fully drained and shut down"); + LOG_DEBUG(log, "CQ is fully drained and shut down"); break; } GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment(); From 1f5c31df5d1112b3dfebd410e8d6591960228f5c Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 12:34:38 +0800 Subject: [PATCH 15/22] add comments Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 4 ++-- dbms/src/Flash/Mpp/MPPTaskManager.h | 4 +++- dbms/src/Server/FlashGrpcServerHolder.cpp | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 639a3fc9935..db61a257917 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -84,13 +84,13 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( + auto graceful_wait_before_shutdown = context->getUsersConfig()->getUInt64( GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index dc67e8d5d56..741b4fa9c07 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,7 +194,9 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - UInt64 waitAllMPPTasksFinish(const std::unique_ptr & global_context); + /// Waits for all MPP tasks to finish, up to `flash.graceful_wait_before_shutdown` (default 600s). + /// Returns remaining wait time in milliseconds, or 0 if timed out. + UInt64 waitAllMPPTasksFinish(const std::unique_ptr & context); std::mutex mu; std::condition_variable cv; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index f2c5e237ac0..e0e50afaf8b 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -25,7 +26,6 @@ // We implement sslServerCredentialsWithFetcher() to set config fetcher // to auto reload sslServerCredentials #include "../../contrib/grpc/src/cpp/server/secure_server_credentials.h" -#include "common/logger_useful.h" namespace DB { From e4c24e19fc1096f55e1e7dc6695d232e76e016e4 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 13:15:22 +0800 Subject: [PATCH 16/22] format Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index db61a257917..c0debe450c2 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -90,9 +90,8 @@ UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & co static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; - auto graceful_wait_before_shutdown = context->getUsersConfig()->getUInt64( - GRACEFUL_WIAT_BEFORE_SHUTDOWN, - DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + auto graceful_wait_before_shutdown + = context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; Stopwatch watch; From b03643739de9d8e7e7b24817d1b57e57e9af1ae8 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 13:57:02 +0800 Subject: [PATCH 17/22] update impl Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 28 +++++++++++++++++++---- dbms/src/Flash/Mpp/MPPTaskManager.h | 4 +--- dbms/src/Server/FlashGrpcServerHolder.cpp | 17 +------------- dbms/src/Server/FlashGrpcServerHolder.h | 7 ------ dbms/src/Server/Server.cpp | 4 +--- 5 files changed, 27 insertions(+), 33 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index c0debe450c2..4dd73ad175c 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -84,7 +84,7 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; @@ -97,6 +97,7 @@ UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & co Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); + UInt64 remaining_wait_ms = 0; while (true) { auto elapsed_ms = watch.elapsedMilliseconds(); @@ -105,14 +106,33 @@ UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & co if (monitored_tasks.empty()) { LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); - return graceful_wait_before_shutdown_ms > elapsed_ms ? graceful_wait_before_shutdown_ms - elapsed_ms - : 0; + remaining_wait_ms + = graceful_wait_before_shutdown_ms > elapsed_ms ? graceful_wait_before_shutdown_ms - elapsed_ms : 0; + break; } } if (elapsed_ms >= graceful_wait_before_shutdown_ms) { LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); - return 0; + remaining_wait_ms = 0; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + UInt64 connection_wait_ms = std::max(remaining_wait_ms, 10 * 1000); + watch.restart(); + while (true) + { + auto elapsed_ms = watch.elapsedMilliseconds(); + if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) + { + LOG_INFO(log, "All MPP grpc connections have finished after {}ms", elapsed_ms); + break; + } + if (elapsed_ms >= connection_wait_ms) + { + LOG_WARNING(log, "Timed out waiting for MPP grpc connections to finish after {}ms", elapsed_ms); + break; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 741b4fa9c07..118632fc0c8 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,9 +194,7 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - /// Waits for all MPP tasks to finish, up to `flash.graceful_wait_before_shutdown` (default 600s). - /// Returns remaining wait time in milliseconds, or 0 if timed out. - UInt64 waitAllMPPTasksFinish(const std::unique_ptr & context); + void waitAllMPPTasksFinish(const std::unique_ptr & context); std::mutex mu; std::condition_variable cv; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index e0e50afaf8b..262488a5156 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -26,6 +26,7 @@ // We implement sslServerCredentialsWithFetcher() to set config fetcher // to auto reload sslServerCredentials #include "../../contrib/grpc/src/cpp/server/secure_server_credentials.h" +#include "Common/Stopwatch.h" namespace DB { @@ -224,22 +225,6 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() /// Shut down grpc server. LOG_INFO(log, "Begin to shut down flash grpc server"); Stopwatch watch; - while (true) - { - auto elapsed_ms = watch.elapsedMilliseconds(); - if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) - { - LOG_INFO(log, "All grpc connections have finished after {}ms", elapsed_ms); - break; - } - if (elapsed_ms >= grpc_shutdown_max_wait_ms) - { - LOG_WARNING(log, "Timed out waiting for grpc connections to finish after {}ms", elapsed_ms); - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } - *is_shutdown = true; flash_grpc_server->Shutdown(); diff --git a/dbms/src/Server/FlashGrpcServerHolder.h b/dbms/src/Server/FlashGrpcServerHolder.h index fc84b1aab44..d7cfeda3864 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.h +++ b/dbms/src/Server/FlashGrpcServerHolder.h @@ -44,12 +44,6 @@ class FlashGrpcServerHolder std::unique_ptr & flashService(); - void setMaxWaitMsDuringGRPCShutdown(UInt64 max_wait_ms) - { - // At least 10 seconds - grpc_shutdown_max_wait_ms = std::max(max_wait_ms, 10 * 1000); - } - private: const LoggerPtr & log; std::shared_ptr> is_shutdown; @@ -62,7 +56,6 @@ class FlashGrpcServerHolder std::vector cq_workers; std::vector notify_cq_workers; CollectProcInfoBackgroundTask background_task; - UInt64 grpc_shutdown_max_wait_ms = 10 * 1000; }; } // namespace DB diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index efcb787035f..cb4d70f5d15 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1256,9 +1256,7 @@ try // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); - UInt64 remaining_wait_time - = tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); - flash_grpc_server_holder.setMaxWaitMsDuringGRPCShutdown(remaining_wait_time); + tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); { // Set limiters stopping and wakeup threads in waitting queue. From 7b6786efcfc60308eba32f799e4e43d4c09e7138 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 13:59:47 +0800 Subject: [PATCH 18/22] u Signed-off-by: gengliqi --- dbms/src/Server/FlashGrpcServerHolder.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 262488a5156..b6b601f3fe1 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -19,14 +19,11 @@ #include #include -#include - // In order to include grpc::SecureServerCredentials which used in // sslServerCredentialsWithFetcher() // We implement sslServerCredentialsWithFetcher() to set config fetcher // to auto reload sslServerCredentials #include "../../contrib/grpc/src/cpp/server/secure_server_credentials.h" -#include "Common/Stopwatch.h" namespace DB { From d89e1f09492624622a1da1537fdefb0a92a9bcbc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 14:04:22 +0800 Subject: [PATCH 19/22] add comments Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 4dd73ad175c..3e8eb3157f2 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -119,6 +119,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } + // Also waits for all MPP gRPC connections to finish, with a minimum wait of 10 seconds UInt64 connection_wait_ms = std::max(remaining_wait_ms, 10 * 1000); watch.restart(); while (true) @@ -126,12 +127,12 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont auto elapsed_ms = watch.elapsedMilliseconds(); if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) { - LOG_INFO(log, "All MPP grpc connections have finished after {}ms", elapsed_ms); + LOG_INFO(log, "All MPP gRPC connections have finished after {}ms", elapsed_ms); break; } if (elapsed_ms >= connection_wait_ms) { - LOG_WARNING(log, "Timed out waiting for MPP grpc connections to finish after {}ms", elapsed_ms); + LOG_WARNING(log, "Timed out waiting for MPP gRPC connections to finish after {}ms", elapsed_ms); break; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); From 0add5d74bc7ae11236ed2ca8597dccf20742111a Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 14:05:58 +0800 Subject: [PATCH 20/22] u Signed-off-by: gengliqi --- dbms/src/Server/FlashGrpcServerHolder.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index b6b601f3fe1..f007418924f 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -17,7 +17,6 @@ #include #include #include -#include // In order to include grpc::SecureServerCredentials which used in // sslServerCredentialsWithFetcher() From e2df9aceae945a116a93be46556441da50878292 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 16:16:17 +0800 Subject: [PATCH 21/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 30 +++++++-------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 3e8eb3157f2..1f011aa3ff6 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -97,42 +97,28 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); - UInt64 remaining_wait_ms = 0; + bool all_tasks_finished = false; while (true) { auto elapsed_ms = watch.elapsedMilliseconds(); + if (!all_tasks_finished) { std::unique_lock lock(mu); if (monitored_tasks.empty()) + all_tasks_finished = true; + } + if (all_tasks_finished) + { + // Also needs to check if all MPP gRPC connections are finished + if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) { LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); - remaining_wait_ms - = graceful_wait_before_shutdown_ms > elapsed_ms ? graceful_wait_before_shutdown_ms - elapsed_ms : 0; break; } } if (elapsed_ms >= graceful_wait_before_shutdown_ms) { LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); - remaining_wait_ms = 0; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } - // Also waits for all MPP gRPC connections to finish, with a minimum wait of 10 seconds - UInt64 connection_wait_ms = std::max(remaining_wait_ms, 10 * 1000); - watch.restart(); - while (true) - { - auto elapsed_ms = watch.elapsedMilliseconds(); - if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) - { - LOG_INFO(log, "All MPP gRPC connections have finished after {}ms", elapsed_ms); - break; - } - if (elapsed_ms >= connection_wait_ms) - { - LOG_WARNING(log, "Timed out waiting for MPP gRPC connections to finish after {}ms", elapsed_ms); break; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); From 7919d586ef6f2684a4a7fe716261df75ad0c4950 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 22 Jul 2025 16:40:00 +0800 Subject: [PATCH 22/22] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1f011aa3ff6..6c091c6b5c0 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -112,7 +112,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont // Also needs to check if all MPP gRPC connections are finished if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) { - LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); + LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms); break; } }