Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 54 additions & 37 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ static int parse_partial_success_response(struct flb_http_client* c,
msgpack_object logEntryError_map;
msgpack_object logEntryCode;
msgpack_object at_type;
int partial_success_found = FLB_FALSE;

if (c->resp.status != 400 && c->resp.status != 403) {
return -1;
Expand Down Expand Up @@ -2866,15 +2867,20 @@ static int parse_partial_success_response(struct flb_http_client* c,
"@type", 5,
MSGPACK_OBJECT_STR,
&at_type);
if (ret != 0 ||
at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE) {
continue;
}

strncpy(at_type_str, at_type.via.str.ptr,
PARTIAL_SUCCESS_GRPC_TYPE_SIZE);
if (ret != 0 ||
at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE ||
strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE,
PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) {
if (strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE,
PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) {
continue;
}

partial_success_found = FLB_TRUE;

ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map,
"logEntryErrors", 14,
MSGPACK_OBJECT_MAP,
Expand Down Expand Up @@ -2925,7 +2931,7 @@ static int parse_partial_success_response(struct flb_http_client* c,
}
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return 0;
return partial_success_found == FLB_TRUE ? 0 : -1;
}
static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
Expand All @@ -2937,7 +2943,8 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
(void) config;
int ret;
int code;
int ret_partial_success;
int failed_records = 0;
int ret_partial_success = -1;
int ret_code = FLB_RETRY;
int formatted_records = 0;
int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0};
Expand All @@ -2953,9 +2960,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
uint64_t write_entries_start = 0;
uint64_t write_entries_end = 0;
float write_entries_latency = 0.0;
uint64_t ts = cfl_time_now();
#ifdef FLB_HAVE_METRICS
char *name = (char *) flb_output_name(ctx->ins);
uint64_t ts = cfl_time_now();
#endif

/* Reformat msgpack to stackdriver JSON payload */
Expand Down Expand Up @@ -3069,53 +3076,60 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
ret_code = FLB_OK;
}
else {
#ifdef FLB_HAVE_METRICS
/* check partial success */
ret_partial_success =
parse_partial_success_response(c,
ctx,
ts,
grpc_status_counts);

int failed_records = 0;
if (ret_partial_success == 0) {
for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
if (grpc_status_counts[code] != 0) {
failed_records += grpc_status_counts[code];
}
}
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
failed_records, 1, (char* []) {name});
int successful_records =
formatted_records - failed_records;
if (successful_records != 0) {
add_record_metrics(ctx, ts, successful_records, 200, 0);
if (ret_partial_success == 0) {
ret_code = FLB_OK;

for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) {
if (grpc_status_counts[code] != 0) {
failed_records += grpc_status_counts[code];
}
}
else {
add_record_metrics(ctx, ts, formatted_records,
c->resp.status, -1);
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
formatted_records, 1,
(char* []) {name});

flb_plg_warn(ctx->ins, "partial success: %d of %d records dropped",
failed_records, formatted_records);

#ifdef FLB_HAVE_METRICS
cmt_counter_add(ctx->ins->cmt_dropped_records, ts,
failed_records, 1, (char* []) {name});
int successful_records =
formatted_records - failed_records;
if (successful_records != 0) {
add_record_metrics(ctx, ts, successful_records, 200, 0);
}
#endif
if (c->resp.status >= 400 && c->resp.status < 500) {
ret_code = FLB_ERROR;
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
c->resp.payload);
}
#ifdef FLB_HAVE_METRICS
else {
if (c->resp.payload_size > 0) {
/* we got an error */
add_record_metrics(ctx, ts, formatted_records,
c->resp.status, -1);
}
#endif
/* Partial success is set FLB_OK, with manual update of failure metrics */
if (ret_partial_success != 0) {
if (c->resp.status >= 400 && c->resp.status < 500) {
ret_code = FLB_ERROR;
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
c->resp.payload);
}
else {
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
c->resp.payload);
if (c->resp.payload_size > 0) {
/* we got an error */
flb_plg_warn(ctx->ins, "tag=%s error sending to Cloud Logging: %s", event_chunk->tag,
c->resp.payload);
}
else {
flb_plg_debug(ctx->ins, "tag=%s response from Cloud Logging: %s", event_chunk->tag,
c->resp.payload);
}
ret_code = FLB_RETRY;
}
ret_code = FLB_RETRY;
}
}
}
Expand All @@ -3127,7 +3141,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
if (write_entries_latency > 0.0) {
cmt_histogram_observe(ctx->cmt_write_entries_latency, ts, write_entries_latency, 1, (char *[]) {name});
}
add_record_metrics(ctx, ts, formatted_records, 200, 0);
/* Only add full batch metrics if this was NOT a partial success */
if (ret_partial_success != 0) {
add_record_metrics(ctx, ts, formatted_records, 200, 0);
}

/* OLD api */
flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics);
Expand Down