diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bcf9c35a366..6c091c6b5c0 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -84,33 +84,41 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } -void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & context) { // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( - GRACEFUL_WIAT_BEFORE_SHUTDOWN, - DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + auto graceful_wait_before_shutdown + = context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000; Stopwatch watch; // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched std::this_thread::sleep_for(std::chrono::seconds(1)); + bool all_tasks_finished = false; while (true) { auto elapsed_ms = watch.elapsedMilliseconds(); + if (!all_tasks_finished) { std::unique_lock lock(mu); if (monitored_tasks.empty()) + all_tasks_finished = true; + } + if (all_tasks_finished) + { + // Also needs to check if all MPP gRPC connections are finished + if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0) { - LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); + LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms); break; } } - if (elapsed_ms >= graceful_wait_before_shutdown * 1000) + if (elapsed_ms >= graceful_wait_before_shutdown_ms) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); + LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms); break; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 35fb030f7e9..118632fc0c8 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,7 +194,7 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } - void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + void waitAllMPPTasksFinish(const std::unique_ptr & context); std::mutex mu; std::condition_variable cv; diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 95fd21ac9aa..f007418924f 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -52,7 +52,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log) // tells us whether there is any kind of event or cq is shutting down. if (!curcq->Next(&tag, &ok)) { - LOG_INFO(log, "CQ is fully drained and shut down"); + LOG_DEBUG(log, "CQ is fully drained and shut down"); break; } GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment(); @@ -220,21 +220,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() { /// Shut down grpc server. LOG_INFO(log, "Begin to shut down flash grpc server"); - flash_grpc_server->Shutdown(); + Stopwatch watch; *is_shutdown = true; - // Wait all existed MPPTunnels done to prevent crash. - // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - const int max_wait_cnt = 300; - int wait_cnt = 0; - while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::seconds(1)); - if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) - LOG_WARNING( - log, - "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", - wait_cnt); - else - LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); + flash_grpc_server->Shutdown(); for (auto & cq : cqs) cq->Shutdown(); @@ -252,7 +240,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() GRPCCompletionQueuePool::global_instance->markShutdown(); GRPCCompletionQueuePool::global_instance = nullptr; - LOG_INFO(log, "Shut down flash grpc server"); + LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds()); /// Close flash service. LOG_INFO(log, "Begin to shut down flash service");