Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,33 +84,41 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & context)
{
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
// The default value of flash.graceful_wait_before_shutdown
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
auto graceful_wait_before_shutdown
= context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000;
Stopwatch watch;
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::seconds(1));
bool all_tasks_finished = false;
while (true)
{
auto elapsed_ms = watch.elapsedMilliseconds();
if (!all_tasks_finished)
{
std::unique_lock lock(mu);
if (monitored_tasks.empty())
all_tasks_finished = true;
}
if (all_tasks_finished)
{
// Also needs to check if all MPP gRPC connections are finished
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
{
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms);
break;
}
}
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
if (elapsed_ms >= graceful_wait_before_shutdown_ms)
{
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ struct MPPTaskMonitor
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);
void waitAllMPPTasksFinish(const std::unique_ptr<Context> & context);

std::mutex mu;
std::condition_variable cv;
Expand Down
20 changes: 4 additions & 16 deletions dbms/src/Server/FlashGrpcServerHolder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log)
// tells us whether there is any kind of event or cq is shutting down.
if (!curcq->Next(&tag, &ok))
{
LOG_INFO(log, "CQ is fully drained and shut down");
LOG_DEBUG(log, "CQ is fully drained and shut down");
break;
}
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment();
Expand Down Expand Up @@ -220,21 +220,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
{
/// Shut down grpc server.
LOG_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown();
Stopwatch watch;
*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));
if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1)
LOG_WARNING(
log,
"Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak",
wait_cnt);
else
LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt);
flash_grpc_server->Shutdown();

for (auto & cq : cqs)
cq->Shutdown();
Expand All @@ -252,7 +240,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
GRPCCompletionQueuePool::global_instance->markShutdown();

GRPCCompletionQueuePool::global_instance = nullptr;
LOG_INFO(log, "Shut down flash grpc server");
LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds());

/// Close flash service.
LOG_INFO(log, "Begin to shut down flash service");
Expand Down