diff --git a/conf/fluent-bit.conf b/conf/fluent-bit.conf index bf3269f38e2..c766bacfc58 100644 --- a/conf/fluent-bit.conf +++ b/conf/fluent-bit.conf @@ -4,6 +4,57 @@ # set an interval of seconds before to flush records to a destination flush 1 + # flush.adaptive + # -------------- + # Enable adaptive flush interval adjustments based on output chunk + # backpressure. + # + # flush.adaptive off + + # flush.adaptive.min_interval + # --------------------------- + # lower bound for adaptive flush interval in seconds. + # + # flush.adaptive.min_interval 0.5 + + # flush.adaptive.max_interval + # --------------------------- + # upper bound for adaptive flush interval in seconds. + # + # flush.adaptive.max_interval 2.0 + + # flush.adaptive.low_pressure + # --------------------------- + # output chunk pressure (%) threshold considered mostly idle. + # + # flush.adaptive.low_pressure 25 + + # flush.adaptive.medium_pressure + # ------------------------------ + # output chunk pressure (%) threshold considered moderate pressure. + # + # flush.adaptive.medium_pressure 50 + + # flush.adaptive.high_pressure + # ---------------------------- + # output chunk pressure (%) threshold considered sustained pressure. + # + # flush.adaptive.high_pressure 75 + + # flush.adaptive.up_steps + # ----------------------- + # consecutive pressure samples required before moving to a faster flush + # step. minimum: 1 + # + # flush.adaptive.up_steps 2 + + # flush.adaptive.down_steps + # ------------------------- + # consecutive idle samples required before moving to a slower flush step. + # minimum: 1 + # + # flush.adaptive.down_steps 3 + # Daemon # ====== # instruct Fluent Bit to run in foreground or background mode. diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index d171ef08167..faa92366438 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -64,6 +64,18 @@ struct flb_config { int is_shutting_down; /* is the service shutting down ? */ int is_running; /* service running ? */ double flush; /* Flush timeout */ + int flush_adaptive; /* Enable adaptive flush interval */ + double flush_adaptive_min_interval; + double flush_adaptive_max_interval; + double flush_adaptive_low_pressure; + double flush_adaptive_medium_pressure; + double flush_adaptive_high_pressure; + int flush_adaptive_up_steps; + int flush_adaptive_down_steps; + int flush_adaptive_level; + int flush_adaptive_hits; + int flush_adaptive_direction; + double flush_adaptive_current_interval; /* * Maximum grace time on shutdown. If set to -1, the engine will @@ -368,6 +380,14 @@ enum conf_type { }; #define FLB_CONF_STR_FLUSH "Flush" +#define FLB_CONF_STR_FLUSH_ADAPTIVE "flush.adaptive" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_MIN "flush.adaptive.min_interval" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_MAX "flush.adaptive.max_interval" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_LOW "flush.adaptive.low_pressure" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_MEDIUM "flush.adaptive.medium_pressure" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_HIGH "flush.adaptive.high_pressure" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_UP_STEPS "flush.adaptive.up_steps" +#define FLB_CONF_STR_FLUSH_ADAPTIVE_DOWN_STEPS "flush.adaptive.down_steps" #define FLB_CONF_STR_GRACE "Grace" #define FLB_CONF_STR_DAEMON "Daemon" #define FLB_CONF_STR_LOGFILE "Log_File" diff --git a/include/fluent-bit/flb_engine.h b/include/fluent-bit/flb_engine.h index 0fcd522ca53..02cefaf5cba 100644 --- a/include/fluent-bit/flb_engine.h +++ b/include/fluent-bit/flb_engine.h @@ -39,6 +39,12 @@ int flb_engine_destroy_tasks(struct mk_list *tasks); void flb_engine_reschedule_retries(struct flb_config *config); void flb_engine_stop_ingestion(struct flb_config *config); +/* Adaptive flush helpers (also used by internal tests) */ +int flb_engine_adaptive_flush_target_level(struct flb_config *config, + double pressure); +double flb_engine_adaptive_flush_interval(struct flb_config *config, + int level); + /* Engine event loop */ void flb_engine_evl_init(); struct mk_event_loop *flb_engine_evl_get(); diff --git a/src/flb_config.c b/src/flb_config.c index b2c33312b2b..a9b9c3581b2 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -55,6 +55,38 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_DOUBLE, offsetof(struct flb_config, flush)}, + {FLB_CONF_STR_FLUSH_ADAPTIVE, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, flush_adaptive)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_MIN, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, flush_adaptive_min_interval)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_MAX, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, flush_adaptive_max_interval)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_LOW, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, flush_adaptive_low_pressure)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_MEDIUM, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, flush_adaptive_medium_pressure)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_HIGH, + FLB_CONF_TYPE_DOUBLE, + offsetof(struct flb_config, flush_adaptive_high_pressure)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_UP_STEPS, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, flush_adaptive_up_steps)}, + + {FLB_CONF_STR_FLUSH_ADAPTIVE_DOWN_STEPS, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, flush_adaptive_down_steps)}, + {FLB_CONF_STR_GRACE, FLB_CONF_TYPE_INT, offsetof(struct flb_config, grace)}, @@ -270,6 +302,18 @@ struct flb_config *flb_config_init() /* Flush */ config->flush = FLB_CONFIG_FLUSH_SECS; + config->flush_adaptive = FLB_FALSE; + config->flush_adaptive_min_interval = 0.5; + config->flush_adaptive_max_interval = 2.0; + config->flush_adaptive_low_pressure = 25.0; + config->flush_adaptive_medium_pressure = 50.0; + config->flush_adaptive_high_pressure = 75.0; + config->flush_adaptive_up_steps = 2; + config->flush_adaptive_down_steps = 3; + config->flush_adaptive_level = 1; + config->flush_adaptive_hits = 0; + config->flush_adaptive_direction = 0; + config->flush_adaptive_current_interval = FLB_CONFIG_FLUSH_SECS; config->daemon = FLB_FALSE; config->init_time = time(NULL); config->kernel = flb_kernel_info(); diff --git a/src/flb_engine.c b/src/flb_engine.c index 9d998b1b702..3b01bc0c70e 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -18,6 +18,7 @@ */ #include +#include #include #include @@ -232,6 +233,247 @@ static inline double calculate_chunk_capacity_percent(struct flb_output_instance ((double)ins->total_limit_size)); } +static inline double adaptive_flush_clamp(double value, double min, double max) +{ + if (value < min) { + return min; + } + + if (value > max) { + return max; + } + + return value; +} + +static double flb_engine_get_chunk_backpressure_percent(struct flb_config *config) +{ + double pressure; + double max_pressure; + struct mk_list *head; + struct flb_output_instance *ins; + + max_pressure = 0.0; + + mk_list_foreach(head, &config->outputs) { + ins = mk_list_entry(head, struct flb_output_instance, _head); + + if (ins->total_limit_size <= 0) { + continue; + } + + pressure = ((double) (ins->fs_backlog_chunks_size + ins->fs_chunks_size) * 100.0) + / ((double) ins->total_limit_size); + + pressure = adaptive_flush_clamp(pressure, 0.0, 100.0); + + if (pressure > max_pressure) { + max_pressure = pressure; + } + } + + return max_pressure; +} + +static int flb_engine_flush_timer_reset(struct flb_config *config, double interval) +{ + struct mk_event *event; + struct flb_time t_flush; + double fallback_interval; + + event = &config->event_flush; + fallback_interval = config->flush_adaptive_current_interval; + + if (event->status != MK_EVENT_NONE) { + mk_event_timeout_destroy(config->evl, event); + } + + flb_time_from_double(&t_flush, interval); + + config->flush_fd = mk_event_timeout_create(config->evl, + t_flush.tm.tv_sec, + t_flush.tm.tv_nsec, + event); + event->priority = FLB_ENGINE_PRIORITY_FLUSH; + + if (config->flush_fd == -1) { + flb_utils_error(FLB_ERR_CFG_FLUSH_CREATE); + + if (fallback_interval > 0.0 && + fabs(fallback_interval - interval) > DBL_EPSILON) { + flb_time_from_double(&t_flush, fallback_interval); + config->flush_fd = mk_event_timeout_create(config->evl, + t_flush.tm.tv_sec, + t_flush.tm.tv_nsec, + event); + event->priority = FLB_ENGINE_PRIORITY_FLUSH; + } + + if (config->flush_fd == -1) { + return -1; + } + } + + return 0; +} + +int flb_engine_adaptive_flush_target_level(struct flb_config *config, + double pressure) +{ + if (pressure >= config->flush_adaptive_high_pressure) { + return 3; + } + else if (pressure >= config->flush_adaptive_medium_pressure) { + return 2; + } + else if (pressure <= config->flush_adaptive_low_pressure) { + return 0; + } + + return 1; +} + +double flb_engine_adaptive_flush_interval(struct flb_config *config, + int level) +{ + double interval; + static const double multipliers[] = {2.0, 1.0, 0.75, 0.5}; + + if (level < 0) { + level = 0; + } + else if (level > 3) { + level = 3; + } + + interval = config->flush * multipliers[level]; + + return adaptive_flush_clamp(interval, + config->flush_adaptive_min_interval, + config->flush_adaptive_max_interval); +} + +static void flb_engine_adaptive_flush_update(struct flb_config *config) +{ + int target_level; + double pressure; + double interval; + + if (config->flush_adaptive == FLB_FALSE) { + return; + } + + pressure = flb_engine_get_chunk_backpressure_percent(config); + + target_level = flb_engine_adaptive_flush_target_level(config, pressure); + + if (target_level > config->flush_adaptive_level) { + if (config->flush_adaptive_direction != 1) { + config->flush_adaptive_direction = 1; + config->flush_adaptive_hits = 0; + } + + config->flush_adaptive_hits++; + + if (config->flush_adaptive_hits >= config->flush_adaptive_up_steps) { + config->flush_adaptive_level++; + config->flush_adaptive_hits = 0; + } + } + else if (target_level < config->flush_adaptive_level) { + if (config->flush_adaptive_direction != -1) { + config->flush_adaptive_direction = -1; + config->flush_adaptive_hits = 0; + } + + config->flush_adaptive_hits++; + + if (config->flush_adaptive_hits >= config->flush_adaptive_down_steps) { + config->flush_adaptive_level--; + config->flush_adaptive_hits = 0; + } + } + else { + config->flush_adaptive_direction = 0; + config->flush_adaptive_hits = 0; + } + + if (config->flush_adaptive_level < 0) { + config->flush_adaptive_level = 0; + } + else if (config->flush_adaptive_level > 3) { + config->flush_adaptive_level = 3; + } + + interval = flb_engine_adaptive_flush_interval(config, + config->flush_adaptive_level); + + if (fabs(interval - config->flush_adaptive_current_interval) + <= (DBL_EPSILON * fmax(fabs(interval), + fabs(config->flush_adaptive_current_interval)))) { + return; + } + + if (flb_engine_flush_timer_reset(config, interval) == 0) { + config->flush_adaptive_current_interval = interval; + flb_debug("[engine] adaptive flush interval %.3f sec (pressure=%.2f%%, level=%i)", + interval, + pressure, + config->flush_adaptive_level); + } +} + +static void flb_engine_adaptive_flush_init(struct flb_config *config) +{ + if (config->flush_adaptive == FLB_FALSE) { + config->flush_adaptive_current_interval = config->flush; + return; + } + + if (config->flush_adaptive_min_interval <= 0.0) { + config->flush_adaptive_min_interval = 0.1; + } + + if (config->flush_adaptive_max_interval < + config->flush_adaptive_min_interval) { + config->flush_adaptive_max_interval = + config->flush_adaptive_min_interval; + } + + if (config->flush_adaptive_up_steps < 1) { + config->flush_adaptive_up_steps = 1; + } + + if (config->flush_adaptive_down_steps < 1) { + config->flush_adaptive_down_steps = 1; + } + + if (config->flush_adaptive_low_pressure < 0.0) { + config->flush_adaptive_low_pressure = 0.0; + } + + if (config->flush_adaptive_high_pressure > 100.0) { + config->flush_adaptive_high_pressure = 100.0; + } + + if (config->flush_adaptive_low_pressure > + config->flush_adaptive_medium_pressure) { + config->flush_adaptive_medium_pressure = + config->flush_adaptive_low_pressure; + } + + if (config->flush_adaptive_medium_pressure > + config->flush_adaptive_high_pressure) { + config->flush_adaptive_medium_pressure = + config->flush_adaptive_high_pressure; + } + + config->flush_adaptive_current_interval = + flb_engine_adaptive_flush_interval(config, + config->flush_adaptive_level); + +} + static void handle_dlq_if_available(struct flb_config *config, struct flb_task *task, struct flb_output_instance *ins, @@ -678,6 +920,7 @@ static FLB_INLINE int flb_engine_handle_event(flb_pipefd_t fd, int mask, if (config->flush_fd == fd) { flb_utils_timer_consume(fd); flb_engine_flush(config, NULL); + flb_engine_adaptive_flush_update(config); return 0; } else if (config->shutdown_fd == fd) { @@ -809,7 +1052,6 @@ int flb_engine_start(struct flb_config *config) uint64_t ts; char tmp[16]; int rb_flush_flag; - struct flb_time t_flush; struct mk_event *event; struct mk_event_loop *evl; struct flb_bucket_queue *evl_bktq; @@ -983,14 +1225,13 @@ int flb_engine_start(struct flb_config *config) event->mask = MK_EVENT_EMPTY; event->status = MK_EVENT_NONE; - flb_time_from_double(&t_flush, config->flush); - config->flush_fd = mk_event_timeout_create(evl, - t_flush.tm.tv_sec, - t_flush.tm.tv_nsec, - event); - event->priority = FLB_ENGINE_PRIORITY_FLUSH; - if (config->flush_fd == -1) { - flb_utils_error(FLB_ERR_CFG_FLUSH_CREATE); + flb_engine_adaptive_flush_init(config); + + if (flb_engine_flush_timer_reset(config, + config->flush_adaptive_current_interval) == -1) { + flb_error("[engine] could not initialize flush timer (interval=%.3f sec)", + config->flush_adaptive_current_interval); + return -1; } diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 9b5b8fdd3d7..77c1986006c 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -57,6 +57,7 @@ set(UNIT_TESTS_FILES unicode.c opentelemetry.c storage_dlq.c + engine_adaptive_flush.c ) # TLS helpers diff --git a/tests/internal/engine_adaptive_flush.c b/tests/internal/engine_adaptive_flush.c new file mode 100644 index 00000000000..2cc5b58793f --- /dev/null +++ b/tests/internal/engine_adaptive_flush.c @@ -0,0 +1,70 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include "flb_tests_internal.h" + +static void test_target_level_thresholds() +{ + struct flb_config *config; + + config = flb_config_init(); + TEST_CHECK(config != NULL); + if (config == NULL) { + return; + } + + config->flush_adaptive_low_pressure = 25.0; + config->flush_adaptive_medium_pressure = 50.0; + config->flush_adaptive_high_pressure = 75.0; + + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 0.0) == 0); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 25.0) == 0); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 25.1) == 1); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 49.9) == 1); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 50.0) == 2); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 74.9) == 2); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 75.0) == 3); + TEST_CHECK(flb_engine_adaptive_flush_target_level(config, 100.0) == 3); + + flb_config_exit(config); +} + +static void test_interval_levels_and_bounds() +{ + struct flb_config *config; + + config = flb_config_init(); + TEST_CHECK(config != NULL); + if (config == NULL) { + return; + } + + config->flush = 1.0; + config->flush_adaptive_min_interval = 0.5; + config->flush_adaptive_max_interval = 2.0; + + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 0) == 2.0); + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 1) == 1.0); + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 2) == 0.75); + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 3) == 0.5); + + /* clamp low/high out-of-range levels */ + TEST_CHECK(flb_engine_adaptive_flush_interval(config, -1) == 2.0); + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 9) == 0.5); + + /* clamp by min/max bounds */ + config->flush = 10.0; + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 3) == 2.0); + + config->flush = 0.1; + TEST_CHECK(flb_engine_adaptive_flush_interval(config, 3) == 0.5); + + flb_config_exit(config); +} + +TEST_LIST = { + { "target_level_thresholds", test_target_level_thresholds }, + { "interval_levels_and_bounds", test_interval_levels_and_bounds }, + { 0 } +};