Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/Cluster/LocalLog/Log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,18 @@ int64_t Log::sequenceForTimestamp(int64_t ts, bool append_time) const
return log_start_sn;
}

size_t Log::deleteOldSegments(int64_t applied_sn)
size_t Log::deleteOldSegments(int64_t applied_sn, bool disk_pressure)
{
LOG_DEBUG(logger, "Garbage collecting applied_sn={} {}", applied_sn, log_config->string());
return deleteLogStartSequenceBreachedSegments(applied_sn) + deleteRetentionSizeBreachedSegments(applied_sn)
LOG_DEBUG(logger, "Garbage collecting applied_sn={} disk_pressure={} {}", applied_sn, disk_pressure, log_config->string());

auto total = deleteLogStartSequenceBreachedSegments(applied_sn)
+ deleteRetentionSizeBreachedSegments(applied_sn)
+ deleteRetentionTimeBreachedSegments(applied_sn);

if (disk_pressure)
total += deleteDiskPressureBreachedSegments(applied_sn);

return total;
}

size_t Log::deleteLogStartSequenceBreachedSegments(int64_t applied_sn)
Expand Down Expand Up @@ -737,6 +744,26 @@ size_t Log::deleteRetentionTimeBreachedSegments(int64_t applied_sn)
return deleteOldSegments(should_delete, applied_sn, "retention_time_breached");
}

size_t Log::deleteDiskPressureBreachedSegments(int64_t applied_sn)
{
auto total_bytes = loglet->size();
if (total_bytes == 0)
return 0;

/// Under disk pressure, bypass min_size_to_keep
auto should_delete = [total_bytes](LogSegmentPtr prev_segment, [[maybe_unused]] LogSegmentPtr current_segment) mutable {
auto prev_seg_size = prev_segment->size();
if (total_bytes > prev_seg_size)
{
total_bytes -= prev_seg_size;
return true;
}
return false;
};

return deleteOldSegments(should_delete, applied_sn, "disk_pressure_breached");
}

size_t Log::deleteOldSegments(std::function<bool(LogSegmentPtr, LogSegmentPtr)> should_delete, int64_t applied_sn, std::string_view reason)
{
auto deletable = loglet->deletableSegments(should_delete, applied_sn);
Expand Down
3 changes: 2 additions & 1 deletion src/Cluster/LocalLog/Log/Log.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class Log final
/// If stream deletion is enabled, delete any local log segments that either expired due to time based
/// retention or because the log size > retention_size
/// Whether or not deletion is enabled, delete any local log segments that are before the log start offset
size_t deleteOldSegments(int64_t applied_sn);
size_t deleteOldSegments(int64_t applied_sn, bool disk_pressure = false);

int64_t appliedSequence() const
{
Expand Down Expand Up @@ -315,6 +315,7 @@ class Log final
size_t deleteLogStartSequenceBreachedSegments(int64_t applied_sn);
size_t deleteRetentionSizeBreachedSegments(int64_t applied_sn);
size_t deleteRetentionTimeBreachedSegments(int64_t applied_sn);
size_t deleteDiskPressureBreachedSegments(int64_t applied_sn);

private:
/// API calls forwarding to Loglet
Expand Down
9 changes: 7 additions & 2 deletions src/Cluster/LocalLog/Log/LogConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void LogConfig::validate()

if (log_position_index_log_size == 0 || log_position_index_log_size > 8 * 1024 * 1024)
log_position_index_log_size = 8 * 1024 * 1024;

if (disk_usage_threshold_percent == 0 || disk_usage_threshold_percent > 100)
disk_usage_threshold_percent = DEFAULT_DISK_USAGE_THRESHOLD_PERCENT;
}

std::string LogConfig::string() const
Expand All @@ -62,7 +65,8 @@ std::string LogConfig::string() const
"max_cached_bytes_per_shard={} codec={} min_size_to_keep={} preallocate={} inmemory={} incremental_flush={} "
"hard_state_ckpt_log_size={} "
"hard_state_ckpt_log_preallocate={} leader_epoch_ckpt_log_size={} leader_epoch_ckpt_log_preallocate={} timestamp_index_log_size={} "
"timestamp_index_log_preallocate={} log_position_index_log_size={} log_position_index_log_preallocate={}",
"timestamp_index_log_preallocate={} log_position_index_log_size={} log_position_index_log_preallocate={} "
"disk_usage_threshold_percent={}",
max_entry_size,
segment_size,
segment_ms,
Expand All @@ -88,6 +92,7 @@ std::string LogConfig::string() const
timestamp_index_log_size,
timestamp_index_log_preallocate,
log_position_index_log_size,
log_position_index_log_preallocate);
log_position_index_log_preallocate,
disk_usage_threshold_percent);
}
}
2 changes: 2 additions & 0 deletions src/Cluster/LocalLog/Log/LogConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct LogConfig
static const uint64_t DEFAULT_INDEX_INTERVAL_ENTRIES = 1000ull;
static const uint64_t DEFAULT_MAX_CACHED_ENTRIES_PER_SHARD = 100ull;
static const uint64_t DEFAULT_MAX_CACHED_BYTES_PER_SHARD = 4194304ull;
static const uint64_t DEFAULT_DISK_USAGE_THRESHOLD_PERCENT = 90;

uint64_t max_entry_size = DEFAULT_MAX_ENTRY_SIZE;
uint64_t segment_size = DEFAULT_SEGMENT_SIZE;
Expand Down Expand Up @@ -58,6 +59,7 @@ struct LogConfig
bool preallocate = true;
bool inmemory = false;
bool incremental_flush = false;
uint64_t disk_usage_threshold_percent = DEFAULT_DISK_USAGE_THRESHOLD_PERCENT;

void validate();

Expand Down
25 changes: 24 additions & 1 deletion src/Cluster/LocalLog/Log/LogManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/setThreadName.h>

#include <fcntl.h>
#include <filesystem>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -537,13 +538,35 @@ void LogManager::cleanupLogs()
{
LOG_DEBUG(logger, "Beginning log cleanup...");

/// Check disk pressure
bool disk_pressure = false;
for (const auto & root_dir : root_dirs)
{
std::error_code ec;
auto space_info = std::filesystem::space(root_dir, ec);
if (!ec && space_info.capacity > 0)
{
auto usage_percent = 100 * (space_info.capacity - space_info.available) / space_info.capacity;
if (usage_percent >= default_log_config->disk_usage_threshold_percent)
{
LOG_WARNING(
logger,
"Disk pressure detected on {}: {}% used (threshold: {}%)",
root_dir.string(),
usage_percent,
default_log_config->disk_usage_threshold_percent);
disk_pressure = true;
}
}
}

size_t total = 0;
auto all_logs = current_logs.values();

try
{
for (auto & log : all_logs)
total += log->deleteOldSegments(log->appliedSequence());
total += log->deleteOldSegments(log->appliedSequence(), disk_pressure);
}
catch (...)
{
Expand Down