From f81fe55ea128ef0b96b967ae51a64302e99aead4 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 6 Aug 2025 13:49:19 +0800 Subject: [PATCH 1/3] refine lac Signed-off-by: guo-shaoge --- .../LocalAdmissionController.cpp | 30 ++++++++++++------- .../LocalAdmissionController.h | 3 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 74bdeac114d..5422e896a18 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -141,6 +141,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d if unlikely (acquire_num == 0.0 && remaining_ru == 0.0) acquire_num = DEFAULT_BUFFER_TOKENS; + // The purpose of subtracting remaining_ru is try to ensure that the number of local tokens + // always stays smae with the amount consumed. acquire_num -= remaining_ru; acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0); return acquire_num; @@ -364,11 +366,9 @@ void LocalAdmissionController::mainLoop() static_assert( tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION && tick_interval <= DEFAULT_TARGET_PERIOD); - auto cur_tick_beg = current_tick; - auto cur_tick_end = cur_tick_beg + tick_interval; + auto cur_tick_end = current_tick + tick_interval; while (!stopped.load()) { - if (current_tick < cur_tick_end) { std::unique_lock lock(mu); if (keyspace_low_token_resource_groups.empty()) @@ -388,19 +388,16 @@ void LocalAdmissionController::mainLoop() try { while (current_tick >= cur_tick_end) - { - updateRUConsumptionSpeed(); - cur_tick_beg = cur_tick_end; cur_tick_end += tick_interval; - } + updateRUConsumptionSpeed(); if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value()) { std::lock_guard lock(gac_requests_mu); gac_requests.push_back(gac_req_opt.value()); gac_requests_cv.notify_all(); } - clearCPUTimeWithoutLock(current_tick); + clearCPUTime(current_tick); checkDegradeMode(); } catch (...) @@ -437,13 +434,17 @@ std::optional LocalAdmissionController::b else { std::unordered_set, LACPairHash> local_keyspace_low_token_resource_groups; + std::unordered_map, ResourceGroupPtr, LACPairHash> + local_keyspace_resource_groups; { std::lock_guard lock(mu); local_keyspace_low_token_resource_groups = keyspace_low_token_resource_groups; keyspace_low_token_resource_groups.clear(); + + local_keyspace_resource_groups = keyspace_resource_groups; } - for (const auto & ele : keyspace_resource_groups) + for (const auto & ele : local_keyspace_resource_groups) { const bool need_fetch_token = local_keyspace_low_token_resource_groups.contains(ele.first); const bool need_report = ele.second->shouldReportRUConsumption(current_tick); @@ -544,6 +545,7 @@ void LocalAdmissionController::doRequestGAC() { std::unique_lock lock(gac_requests_mu); gac_requests_cv.wait(lock, [this]() { return stopped.load() || !gac_requests.empty(); }); + // gjt todo: still report if stop? if unlikely (stopped.load()) return; local_gac_requests = gac_requests; @@ -569,6 +571,7 @@ void LocalAdmissionController::doRequestGAC() std::vector> not_found; // not_found includes resource group names that appears in gac_req but not found in resp. // This can happen when the resource group has been deleted. + // Normally delete event will be watched if unlikely (handled.size() != req_rg_names.size()) { for (const auto & req_rg_name : req_rg_names) @@ -653,6 +656,7 @@ std::vector> LocalAdmissionController::handle if unlikely (one_resp.granted_r_u_tokens().size() != 1) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}", @@ -665,6 +669,7 @@ std::vector> LocalAdmissionController::handle const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0]; if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU) { + resource_group->endRequest(); LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString()); continue; } @@ -672,6 +677,7 @@ std::vector> LocalAdmissionController::handle const auto trickle_ms = granted_token_bucket.trickle_time_ms(); if unlikely (trickle_ms < 0) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected trickle_ms: {} one_resp: {}", @@ -686,6 +692,7 @@ std::vector> LocalAdmissionController::handle double added_tokens = granted_token_bucket.granted_tokens().tokens(); if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0) { + resource_group->endRequest(); LOG_ERROR( log, "{} invalid added_tokens: {} one_resp: {}", @@ -697,6 +704,7 @@ std::vector> LocalAdmissionController::handle auto trickle_left_tokens = resource_group->getTrickleLeftTokens(now); if unlikely (!std::isfinite(trickle_left_tokens) || trickle_left_tokens < 0.0) { + // gjt todo: output trickle_deadline and tp and fillrate. LOG_ERROR( log, "{} invalid trickle_left_tokens: {} one_resp: {}, reset to zero", @@ -858,7 +866,7 @@ bool LocalAdmissionController::handleDeleteEvent( std::lock_guard lock(mu); erase_num = deleteResourceGroupWithoutLock(keyspace_id, name); } - LOG_DEBUG(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num); + LOG_INFO(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num); return true; } @@ -896,7 +904,7 @@ bool LocalAdmissionController::handlePutEvent( updateMaxRUPerSecAfterDeleteWithoutLock(rg->user_ru_per_sec); } } - LOG_DEBUG(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString()); + LOG_INFO(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString()); return true; } diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 5319a37dcf7..889de84a3b2 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -614,9 +614,10 @@ class LocalAdmissionController final : private boost::noncopyable std::string & err_msg); void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec); - void clearCPUTimeWithoutLock(const SteadyClock::time_point & now) + void clearCPUTime(const SteadyClock::time_point & now) { static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL); + std::lock_guard lock(mu); if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION) { for (auto & ele : keyspace_resource_groups) From 0c9bd9a12871cbb238468391a334a3d848269b96 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 6 Aug 2025 14:13:58 +0800 Subject: [PATCH 2/3] fix Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 5422e896a18..2e9c3fb9281 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -545,7 +545,6 @@ void LocalAdmissionController::doRequestGAC() { std::unique_lock lock(gac_requests_mu); gac_requests_cv.wait(lock, [this]() { return stopped.load() || !gac_requests.empty(); }); - // gjt todo: still report if stop? if unlikely (stopped.load()) return; local_gac_requests = gac_requests; @@ -647,7 +646,7 @@ std::vector> LocalAdmissionController::handle const String err_msg = fmt::format("handle acquire token resp failed: rg: {}(keyspace={})", name, keyspace_id); // It's possible for one_resp.granted_r_u_tokens() to be empty - // when the acquire_token_req is only for report RU consumption. + // when the acquire_token_req is only for report RU consumption or GAC got error(like nan token). if (one_resp.granted_r_u_tokens().empty()) { resource_group->endRequest(); @@ -704,7 +703,6 @@ std::vector> LocalAdmissionController::handle auto trickle_left_tokens = resource_group->getTrickleLeftTokens(now); if unlikely (!std::isfinite(trickle_left_tokens) || trickle_left_tokens < 0.0) { - // gjt todo: output trickle_deadline and tp and fillrate. LOG_ERROR( log, "{} invalid trickle_left_tokens: {} one_resp: {}, reset to zero", From 14c36f9a71a244407fecaa69d6c26b74a8bec35a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 6 Aug 2025 14:24:12 +0800 Subject: [PATCH 3/3] fix Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 2e9c3fb9281..a920c1add19 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -142,7 +142,7 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d acquire_num = DEFAULT_BUFFER_TOKENS; // The purpose of subtracting remaining_ru is try to ensure that the number of local tokens - // always stays smae with the amount consumed. + // always stays same with the amount consumed. acquire_num -= remaining_ru; acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0); return acquire_num; @@ -570,7 +570,6 @@ void LocalAdmissionController::doRequestGAC() std::vector> not_found; // not_found includes resource group names that appears in gac_req but not found in resp. // This can happen when the resource group has been deleted. - // Normally delete event will be watched if unlikely (handled.size() != req_rg_names.size()) { for (const auto & req_rg_name : req_rg_names)