From ab382cbc40275018fa1810fdc75c69f54560e166 Mon Sep 17 00:00:00 2001 From: SBALAVIGNESH123 Date: Fri, 20 Mar 2026 19:16:37 +0530 Subject: [PATCH] nlog: add disk-pressure-aware retention to fix #721 --- src/Cluster/LocalLog/Log/Log.cpp | 33 ++++++++++++++++++++++--- src/Cluster/LocalLog/Log/Log.h | 3 ++- src/Cluster/LocalLog/Log/LogConfig.cpp | 9 +++++-- src/Cluster/LocalLog/Log/LogConfig.h | 2 ++ src/Cluster/LocalLog/Log/LogManager.cpp | 25 ++++++++++++++++++- 5 files changed, 65 insertions(+), 7 deletions(-) diff --git a/src/Cluster/LocalLog/Log/Log.cpp b/src/Cluster/LocalLog/Log/Log.cpp index b43edcd720a..bffdf634341 100644 --- a/src/Cluster/LocalLog/Log/Log.cpp +++ b/src/Cluster/LocalLog/Log/Log.cpp @@ -664,11 +664,18 @@ int64_t Log::sequenceForTimestamp(int64_t ts, bool append_time) const return log_start_sn; } -size_t Log::deleteOldSegments(int64_t applied_sn) +size_t Log::deleteOldSegments(int64_t applied_sn, bool disk_pressure) { - LOG_DEBUG(logger, "Garbage collecting applied_sn={} {}", applied_sn, log_config->string()); - return deleteLogStartSequenceBreachedSegments(applied_sn) + deleteRetentionSizeBreachedSegments(applied_sn) + LOG_DEBUG(logger, "Garbage collecting applied_sn={} disk_pressure={} {}", applied_sn, disk_pressure, log_config->string()); + + auto total = deleteLogStartSequenceBreachedSegments(applied_sn) + + deleteRetentionSizeBreachedSegments(applied_sn) + deleteRetentionTimeBreachedSegments(applied_sn); + + if (disk_pressure) + total += deleteDiskPressureBreachedSegments(applied_sn); + + return total; } size_t Log::deleteLogStartSequenceBreachedSegments(int64_t applied_sn) @@ -737,6 +744,26 @@ size_t Log::deleteRetentionTimeBreachedSegments(int64_t applied_sn) return deleteOldSegments(should_delete, applied_sn, "retention_time_breached"); } +size_t Log::deleteDiskPressureBreachedSegments(int64_t applied_sn) +{ + auto total_bytes = loglet->size(); + if (total_bytes == 0) + return 0; + + /// Under disk pressure, bypass min_size_to_keep + auto should_delete = [total_bytes](LogSegmentPtr prev_segment, [[maybe_unused]] LogSegmentPtr current_segment) mutable { + auto prev_seg_size = prev_segment->size(); + if (total_bytes > prev_seg_size) + { + total_bytes -= prev_seg_size; + return true; + } + return false; + }; + + return deleteOldSegments(should_delete, applied_sn, "disk_pressure_breached"); +} + size_t Log::deleteOldSegments(std::function should_delete, int64_t applied_sn, std::string_view reason) { auto deletable = loglet->deletableSegments(should_delete, applied_sn); diff --git a/src/Cluster/LocalLog/Log/Log.h b/src/Cluster/LocalLog/Log/Log.h index 0634b8f2b7e..e84fedb63ac 100644 --- a/src/Cluster/LocalLog/Log/Log.h +++ b/src/Cluster/LocalLog/Log/Log.h @@ -253,7 +253,7 @@ class Log final /// If stream deletion is enabled, delete any local log segments that either expired due to time based /// retention or because the log size > retention_size /// Whether or not deletion is enabled, delete any local log segments that are before the log start offset - size_t deleteOldSegments(int64_t applied_sn); + size_t deleteOldSegments(int64_t applied_sn, bool disk_pressure = false); int64_t appliedSequence() const { @@ -315,6 +315,7 @@ class Log final size_t deleteLogStartSequenceBreachedSegments(int64_t applied_sn); size_t deleteRetentionSizeBreachedSegments(int64_t applied_sn); size_t deleteRetentionTimeBreachedSegments(int64_t applied_sn); + size_t deleteDiskPressureBreachedSegments(int64_t applied_sn); private: /// API calls forwarding to Loglet diff --git a/src/Cluster/LocalLog/Log/LogConfig.cpp b/src/Cluster/LocalLog/Log/LogConfig.cpp index 8597474fd0a..32b79b3e8f7 100644 --- a/src/Cluster/LocalLog/Log/LogConfig.cpp +++ b/src/Cluster/LocalLog/Log/LogConfig.cpp @@ -52,6 +52,9 @@ void LogConfig::validate() if (log_position_index_log_size == 0 || log_position_index_log_size > 8 * 1024 * 1024) log_position_index_log_size = 8 * 1024 * 1024; + + if (disk_usage_threshold_percent == 0 || disk_usage_threshold_percent > 100) + disk_usage_threshold_percent = DEFAULT_DISK_USAGE_THRESHOLD_PERCENT; } std::string LogConfig::string() const @@ -62,7 +65,8 @@ std::string LogConfig::string() const "max_cached_bytes_per_shard={} codec={} min_size_to_keep={} preallocate={} inmemory={} incremental_flush={} " "hard_state_ckpt_log_size={} " "hard_state_ckpt_log_preallocate={} leader_epoch_ckpt_log_size={} leader_epoch_ckpt_log_preallocate={} timestamp_index_log_size={} " - "timestamp_index_log_preallocate={} log_position_index_log_size={} log_position_index_log_preallocate={}", + "timestamp_index_log_preallocate={} log_position_index_log_size={} log_position_index_log_preallocate={} " + "disk_usage_threshold_percent={}", max_entry_size, segment_size, segment_ms, @@ -88,6 +92,7 @@ std::string LogConfig::string() const timestamp_index_log_size, timestamp_index_log_preallocate, log_position_index_log_size, - log_position_index_log_preallocate); + log_position_index_log_preallocate, + disk_usage_threshold_percent); } } diff --git a/src/Cluster/LocalLog/Log/LogConfig.h b/src/Cluster/LocalLog/Log/LogConfig.h index 75f0f3b0cf3..8c8772ab05a 100644 --- a/src/Cluster/LocalLog/Log/LogConfig.h +++ b/src/Cluster/LocalLog/Log/LogConfig.h @@ -25,6 +25,7 @@ struct LogConfig static const uint64_t DEFAULT_INDEX_INTERVAL_ENTRIES = 1000ull; static const uint64_t DEFAULT_MAX_CACHED_ENTRIES_PER_SHARD = 100ull; static const uint64_t DEFAULT_MAX_CACHED_BYTES_PER_SHARD = 4194304ull; + static const uint64_t DEFAULT_DISK_USAGE_THRESHOLD_PERCENT = 90; uint64_t max_entry_size = DEFAULT_MAX_ENTRY_SIZE; uint64_t segment_size = DEFAULT_SEGMENT_SIZE; @@ -58,6 +59,7 @@ struct LogConfig bool preallocate = true; bool inmemory = false; bool incremental_flush = false; + uint64_t disk_usage_threshold_percent = DEFAULT_DISK_USAGE_THRESHOLD_PERCENT; void validate(); diff --git a/src/Cluster/LocalLog/Log/LogManager.cpp b/src/Cluster/LocalLog/Log/LogManager.cpp index 50eddcea903..d0c969a7a24 100644 --- a/src/Cluster/LocalLog/Log/LogManager.cpp +++ b/src/Cluster/LocalLog/Log/LogManager.cpp @@ -7,6 +7,7 @@ #include #include +#include namespace CurrentMetrics { @@ -537,13 +538,35 @@ void LogManager::cleanupLogs() { LOG_DEBUG(logger, "Beginning log cleanup..."); + /// Check disk pressure + bool disk_pressure = false; + for (const auto & root_dir : root_dirs) + { + std::error_code ec; + auto space_info = std::filesystem::space(root_dir, ec); + if (!ec && space_info.capacity > 0) + { + auto usage_percent = 100 * (space_info.capacity - space_info.available) / space_info.capacity; + if (usage_percent >= default_log_config->disk_usage_threshold_percent) + { + LOG_WARNING( + logger, + "Disk pressure detected on {}: {}% used (threshold: {}%)", + root_dir.string(), + usage_percent, + default_log_config->disk_usage_threshold_percent); + disk_pressure = true; + } + } + } + size_t total = 0; auto all_logs = current_logs.values(); try { for (auto & log : all_logs) - total += log->deleteOldSegments(log->appliedSequence()); + total += log->deleteOldSegments(log->appliedSequence(), disk_pressure); } catch (...) {