diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 38fefd7a8fd..dc72ca4346b 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -2777,6 +2777,7 @@ static int parse_partial_success_response(struct flb_http_client* c, msgpack_object logEntryError_map; msgpack_object logEntryCode; msgpack_object at_type; + int partial_success_found = FLB_FALSE; if (c->resp.status != 400 && c->resp.status != 403) { return -1; @@ -2866,15 +2867,20 @@ static int parse_partial_success_response(struct flb_http_client* c, "@type", 5, MSGPACK_OBJECT_STR, &at_type); + if (ret != 0 || + at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE) { + continue; + } + strncpy(at_type_str, at_type.via.str.ptr, PARTIAL_SUCCESS_GRPC_TYPE_SIZE); - if (ret != 0 || - at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE || - strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE, - PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) { + if (strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE, + PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) { continue; } + partial_success_found = FLB_TRUE; + ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map, "logEntryErrors", 14, MSGPACK_OBJECT_MAP, @@ -2925,7 +2931,7 @@ static int parse_partial_success_response(struct flb_http_client* c, } flb_free(buffer); msgpack_unpacked_destroy(&result); - return 0; + return partial_success_found == FLB_TRUE ? 0 : -1; } static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, @@ -2937,7 +2943,8 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, (void) config; int ret; int code; - int ret_partial_success; + int failed_records = 0; + int ret_partial_success = -1; int ret_code = FLB_RETRY; int formatted_records = 0; int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0}; @@ -2953,9 +2960,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, uint64_t write_entries_start = 0; uint64_t write_entries_end = 0; float write_entries_latency = 0.0; + uint64_t ts = cfl_time_now(); #ifdef FLB_HAVE_METRICS char *name = (char *) flb_output_name(ctx->ins); - uint64_t ts = cfl_time_now(); #endif /* Reformat msgpack to stackdriver JSON payload */ @@ -3069,7 +3076,6 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, ret_code = FLB_OK; } else { -#ifdef FLB_HAVE_METRICS /* check partial success */ ret_partial_success = parse_partial_success_response(c, @@ -3077,45 +3083,53 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, ts, grpc_status_counts); - int failed_records = 0; - if (ret_partial_success == 0) { - for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) { - if (grpc_status_counts[code] != 0) { - failed_records += grpc_status_counts[code]; - } - } - cmt_counter_add(ctx->ins->cmt_dropped_records, ts, - failed_records, 1, (char* []) {name}); - int successful_records = - formatted_records - failed_records; - if (successful_records != 0) { - add_record_metrics(ctx, ts, successful_records, 200, 0); + if (ret_partial_success == 0) { + ret_code = FLB_OK; + + for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) { + if (grpc_status_counts[code] != 0) { + failed_records += grpc_status_counts[code]; } } - else { - add_record_metrics(ctx, ts, formatted_records, - c->resp.status, -1); - cmt_counter_add(ctx->ins->cmt_dropped_records, ts, - formatted_records, 1, - (char* []) {name}); + + flb_plg_warn(ctx->ins, "partial success: %d of %d records dropped", + failed_records, formatted_records); + +#ifdef FLB_HAVE_METRICS + cmt_counter_add(ctx->ins->cmt_dropped_records, ts, + failed_records, 1, (char* []) {name}); + int successful_records = + formatted_records - failed_records; + if (successful_records != 0) { + add_record_metrics(ctx, ts, successful_records, 200, 0); } #endif - if (c->resp.status >= 400 && c->resp.status < 500) { - ret_code = FLB_ERROR; - flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, - c->resp.payload); } +#ifdef FLB_HAVE_METRICS else { - if (c->resp.payload_size > 0) { - /* we got an error */ + add_record_metrics(ctx, ts, formatted_records, + c->resp.status, -1); + } +#endif + /* Partial success is set FLB_OK, with manual update of failure metrics */ + if (ret_partial_success != 0) { + if (c->resp.status >= 400 && c->resp.status < 500) { + ret_code = FLB_ERROR; flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, c->resp.payload); } else { - flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag, - c->resp.payload); + if (c->resp.payload_size > 0) { + /* we got an error */ + flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag, + c->resp.payload); + } + else { + flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag, + c->resp.payload); + } + ret_code = FLB_RETRY; } - ret_code = FLB_RETRY; } } } @@ -3127,7 +3141,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, if (write_entries_latency > 0.0) { cmt_histogram_observe(ctx->cmt_write_entries_latency, ts, write_entries_latency, 1, (char *[]) {name}); } - add_record_metrics(ctx, ts, formatted_records, 200, 0); + /* Only add full batch metrics if this was NOT a partial success */ + if (ret_partial_success != 0) { + add_record_metrics(ctx, ts, formatted_records, 200, 0); + } /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);