From 811c57f5605b7a4edd7e3907145f423af6094041 Mon Sep 17 00:00:00 2001 From: pdewilde Date: Mon, 6 Apr 2026 14:56:05 -0700 Subject: [PATCH] out_stackdriver: fix metric accounting for dropped records and decouple logic This patch refactors the response handling in the stackdriver output plugin to resolve incorrect metric accounting and inconsistent behavior between builds with and without Prometheus metrics enabled. 1. Defer Metric Accounting to Core Engine: Removed manual increment of cmt_dropped_records on standard 4xx errors. The core engine automatically increments drop counters when the plugin returns FLB_ERROR. Manual increments caused double counting. This aligns with how other plugins (like out_cloudwatch_logs) defer core accounting to the engine. 2. Enable Partial Success Handling for All Builds: Moved parse_partial_success_response outside of #ifdef FLB_HAVE_METRICS. Previously, partial success was only checked if metrics were enabled. Now, all builds will correctly identify partial successes and return FLB_OK to clear the chunk, preventing unnecessary retries and ensuring consistent behavior across different build configurations. 3. Added Visibility Logging: Added a warning log for partial successes that reports the exact number of failed records, providing operator visibility into silent drops. 4. Style Guide Compliance: Ensured all variables are declared at the beginning of the function scope as required by Fluent Bit style guidelines. Moved ts outside of ifdef to support its use on all builds. 5. Fixed Double Counting on Partial Success: Guarded the batch success metrics update in if (ret_code == FLB_OK) to ensure we don't double count successful records that were already accounted for in the partial success block. 6. Fixed Partial Success Parsing Bug and Safety Issue: Fixed parse_partial_success_response to only return 0 when a partial success detail is actually found, preventing accidental dropping of logs on standard 400/403 errors with other details. Also split the type check to ensure ret == 0 before accessing string pointers, avoiding potential crashes. Signed-off-by: pdewilde --- plugins/out_stackdriver/stackdriver.c | 91 ++++++++++++++++----------- 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 38fefd7a8fd..dc72ca4346b 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -2777,6 +2777,7 @@ static int parse_partial_success_response(struct flb_http_client* c, msgpack_object logEntryError_map; msgpack_object logEntryCode; msgpack_object at_type; + int partial_success_found = FLB_FALSE; if (c->resp.status != 400 && c->resp.status != 403) { return -1; @@ -2866,15 +2867,20 @@ static int parse_partial_success_response(struct flb_http_client* c, "@type", 5, MSGPACK_OBJECT_STR, &at_type); + if (ret != 0 || + at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE) { + continue; + } + strncpy(at_type_str, at_type.via.str.ptr, PARTIAL_SUCCESS_GRPC_TYPE_SIZE); - if (ret != 0 || - at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE || - strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE, - PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) { + if (strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE, + PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) { continue; } + partial_success_found = FLB_TRUE; + ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map, "logEntryErrors", 14, MSGPACK_OBJECT_MAP, @@ -2925,7 +2931,7 @@ static int parse_partial_success_response(struct flb_http_client* c, } flb_free(buffer); msgpack_unpacked_destroy(&result); - return 0; + return partial_success_found == FLB_TRUE ? 0 : -1; } static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, @@ -2937,7 +2943,8 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, (void) config; int ret; int code; - int ret_partial_success; + int failed_records = 0; + int ret_partial_success = -1; int ret_code = FLB_RETRY; int formatted_records = 0; int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0}; @@ -2953,9 +2960,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, uint64_t write_entries_start = 0; uint64_t write_entries_end = 0; float write_entries_latency = 0.0; + uint64_t ts = cfl_time_now(); #ifdef FLB_HAVE_METRICS char *name = (char *) flb_output_name(ctx->ins); - uint64_t ts = cfl_time_now(); #endif /* Reformat msgpack to stackdriver JSON payload */ @@ -3069,7 +3076,6 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, ret_code = FLB_OK; } else { -#ifdef FLB_HAVE_METRICS /* check partial success */ ret_partial_success = parse_partial_success_response(c, @@ -3077,45 +3083,53 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, ts, grpc_status_counts); - int failed_records = 0; - if (ret_partial_success == 0) { - for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) { - if (grpc_status_counts[code] != 0) { - failed_records += grpc_status_counts[code]; - } - } - cmt_counter_add(ctx->ins->cmt_dropped_records, ts, - failed_records, 1, (char* []) {name}); - int successful_records = - formatted_records - failed_records; - if (successful_records != 0) { - add_record_metrics(ctx, ts, successful_records, 200, 0); + if (ret_partial_success == 0) { + ret_code = FLB_OK; + + for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) { + if (grpc_status_counts[code] != 0) { + failed_records += grpc_status_counts[code]; } } - else { - add_record_metrics(ctx, ts, formatted_records, - c->resp.status, -1); - cmt_counter_add(ctx->ins->cmt_dropped_records, ts, - formatted_records, 1, - (char* []) {name}); + + flb_plg_warn(ctx->ins, "partial success: %d of %d records dropped", + failed_records, formatted_records); + +#ifdef FLB_HAVE_METRICS + cmt_counter_add(ctx->ins->cmt_dropped_records, ts, + failed_records, 1, (char* []) {name}); + int successful_records = + formatted_records - failed_records; + if (successful_records != 0) { + add_record_metrics(ctx, ts, successful_records, 200, 0); } #endif - if (c->resp.status >= 400 && c->resp.status < 500) { - ret_code = FLB_ERROR; - flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, - c->resp.payload); } +#ifdef FLB_HAVE_METRICS else { - if (c->resp.payload_size > 0) { - /* we got an error */ + add_record_metrics(ctx, ts, formatted_records, + c->resp.status, -1); + } +#endif + /* Partial success is set FLB_OK, with manual update of failure metrics */ + if (ret_partial_success != 0) { + if (c->resp.status >= 400 && c->resp.status < 500) { + ret_code = FLB_ERROR; flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, c->resp.payload); } else { - flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag, - c->resp.payload); + if (c->resp.payload_size > 0) { + /* we got an error */ + flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, + c->resp.payload); + } + else { + flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag, + c->resp.payload); + } + ret_code = FLB_RETRY; } - ret_code = FLB_RETRY; } } } @@ -3127,7 +3141,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, if (write_entries_latency > 0.0) { cmt_histogram_observe(ctx->cmt_write_entries_latency, ts, write_entries_latency, 1, (char *[]) {name}); } - add_record_metrics(ctx, ts, formatted_records, 200, 0); + /* Only add full batch metrics if this was NOT a partial success */ + if (ret_partial_success != 0) { + add_record_metrics(ctx, ts, formatted_records, 200, 0); + } /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);