From 18c4684f408b06c1a1cfebb73247d310910b0d65 Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Tue, 15 Jul 2025 20:26:06 +0800 Subject: [PATCH 1/4] Support graceful shutdown in TiFlash again (#10299) close pingcap/tiflash#10266 Support graceful shutdown in TiFlash again Signed-off-by: gengliqi --- dbms/src/Flash/FlashService.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 34 +++++++++++++++++++++++++++ dbms/src/Flash/Mpp/MPPTaskManager.h | 7 ++++++ dbms/src/Server/Server.cpp | 6 +++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index aad0105d34f..7f8c98aa76a 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -533,7 +533,7 @@ grpc::Status FlashService::IsAlive( return check_result; auto & tmt_context = context->getTMTContext(); - response->set_available(tmt_context.checkRunning()); + response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable()); response->set_mpp_version(DB::GetMppVersion()); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bb42cd3871d..3419613187e 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -82,6 +83,39 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +{ + // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down + static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; + // The default value of flash.graceful_wait_before_shutdown + static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( + GRACEFUL_WIAT_BEFORE_SHUTDOWN, + DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + Stopwatch watch; + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::seconds(1)); + while (true) + { + auto elapsed_ms = watch.elapsedMilliseconds(); + { + std::unique_lock lock(mu); + if (monitored_tasks.empty()) + { + LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); + break; + } + } + if (elapsed_ms >= graceful_wait_before_shutdown * 1000) + { + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 78090e9ab0a..03f4dfbf5cf 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -193,6 +193,8 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } + void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; @@ -220,6 +222,8 @@ class MPPTaskManager : private boost::noncopyable std::shared_ptr monitor; + std::atomic is_available{true}; + public: explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler); @@ -270,6 +274,9 @@ class MPPTaskManager : private boost::noncopyable bool isTaskExists(const MPPTaskId & id); + void setUnavailable() { is_available = false; } + bool isAvailable() { return is_available; } + private: MPPQueryPtr addMPPQuery( const MPPQueryId & query_id, diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 1823a23df01..9e6e4a24870 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1920,6 +1920,12 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + // Note: `waitAllMPPTasksFinish` must be called before stopping the proxy. + // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. + LOG_INFO(log, "Set unavailable for MPPTask"); + tmt_context.getMPPTaskManager()->setUnavailable(); + tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + { // Set limiters stopping and wakeup threads in waitting queue. global_context->getIORateLimiter().setStop(); From 821266e81e699be44f224a9fc3a72c76175de5fc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Wed, 23 Jul 2025 00:49:42 +0800 Subject: [PATCH 2/4] add https://github.com/pingcap/tiflash/pull/10312 Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 22 +++++++++++++++------- dbms/src/Flash/Mpp/MPPTaskManager.h | 2 +- dbms/src/Server/FlashGrpcServerHolder.cpp | 20 ++++---------------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 3419613187e..00fe04cfb2f 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -83,33 +83,41 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( - GRACEFUL_WIAT_BEFORE_SHUTDOWN, - DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + auto graceful_wait_before_shutdown + = context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); + bool all_tasks_finished = false; while (true) { auto elapsed_ms = watch.elapsedMilliseconds(); + if (!all_tasks_finished) { std::unique_lock lock(mu); if (monitored_tasks.empty()) + all_tasks_finished = true; + } + if (all_tasks_finished) + { + // Also needs to check if all MPP gRPC connections are finished + if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) { - LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); + LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms); break; } } - if (elapsed_ms >= graceful_wait_before_shutdown * 1000) + if (elapsed_ms >= graceful_wait_before_shutdown_ms) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); + LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); break; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 03f4dfbf5cf..7d8c7f64f71 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -193,7 +193,7 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + void waitAllMPPTasksFinish(const std::unique_ptr & context); std::mutex mu; std::condition_variable cv; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 146e73180d4..5a029292f05 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -49,7 +49,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log) // tells us whether there is any kind of event or cq is shutting down. if (!curcq->Next(&tag, &ok)) { - LOG_INFO(log, "CQ is fully drained and shut down"); + LOG_DEBUG(log, "CQ is fully drained and shut down"); break; } GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment(); @@ -217,21 +217,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() { /// Shut down grpc server. LOG_INFO(log, "Begin to shut down flash grpc server"); - flash_grpc_server->Shutdown(); + Stopwatch watch; *is_shutdown = true; - // Wait all existed MPPTunnels done to prevent crash. - // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - const int max_wait_cnt = 300; - int wait_cnt = 0; - while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::seconds(1)); - if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) - LOG_WARNING( - log, - "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", - wait_cnt); - else - LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); + flash_grpc_server->Shutdown(); for (auto & cq : cqs) cq->Shutdown(); @@ -249,7 +237,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() GRPCCompletionQueuePool::global_instance->markShutdown(); GRPCCompletionQueuePool::global_instance = nullptr; - LOG_INFO(log, "Shut down flash grpc server"); + LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds()); /// Close flash service. LOG_INFO(log, "Begin to shut down flash service"); From 5c021e207d20042e5ce099afcd5f55390c12bfdc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 24 Jul 2025 16:31:51 +0800 Subject: [PATCH 3/4] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 00fe04cfb2f..6f99c2bf2ab 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -91,7 +91,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; auto graceful_wait_before_shutdown = context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); - LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + LOG_INFO(log, "Start to wait all MPP tasks to finish, timeout={}s", graceful_wait_before_shutdown); UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched From 63cf6685282eba054b7dd8da380a835ecb5c41aa Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Wed, 30 Jul 2025 17:21:55 +0800 Subject: [PATCH 4/4] cherry-pick Rename graceful_wait_before_shutdown to graceful_wait_shutdown_timeout #10317 Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 6f99c2bf2ab..87d4cca914b 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -86,13 +86,13 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down - static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; - // The default value of flash.graceful_wait_before_shutdown - static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; - auto graceful_wait_before_shutdown - = context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); - LOG_INFO(log, "Start to wait all MPP tasks to finish, timeout={}s", graceful_wait_before_shutdown); - UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; + static constexpr const char * GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = "flash.graceful_wait_shutdown_timeout"; + // The default value of flash.graceful_wait_shutdown_timeout + static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = 600; + auto graceful_wait_shutdown_timeout + = context->getUsersConfig()->getUInt64(GRACEFUL_WAIT_SHUTDOWN_TIMEOUT, DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT); + LOG_INFO(log, "Start to wait all MPP tasks to finish, timeout={}s", graceful_wait_shutdown_timeout); + UInt64 graceful_wait_shutdown_timeout_ms = graceful_wait_shutdown_timeout * 1000; Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -115,7 +115,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & cont break; } } - if (elapsed_ms >= graceful_wait_before_shutdown_ms) + if (elapsed_ms >= graceful_wait_shutdown_timeout_ms) { LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); break;