66#include <string.h>
77
88#include <cmetrics/cmt_counter.h>
9+ #include <cmetrics/cmt_exp_histogram.h>
910#include <cmetrics/cmt_decode_msgpack.h>
1011#include <cmetrics/cmt_encode_opentelemetry.h>
1112#include <cmetrics/cmt_encode_text.h>
2324#define MAX_CAPTURED_VALUES 32
2425#define MAX_CAPTURE_METRICS 8
2526
27+ #define CAPTURE_TYPE_VALUE 0
28+ #define CAPTURE_TYPE_EXP_HIST_FIELD 1
29+
2630struct http_client_ctx {
2731 struct flb_upstream * upstream ;
2832 struct flb_connection * connection ;
@@ -40,6 +44,8 @@ struct rt_ctx {
4044
4145struct metric_capture {
4246 const char * line_prefix ;
47+ const char * field_name ;
48+ int capture_type ;
4349 double values [MAX_CAPTURED_VALUES ];
4450 int value_count ;
4551};
@@ -107,6 +113,60 @@ static int find_metric_value(const char *text,
107113 return -1 ;
108114}
109115
116+ static int find_exp_hist_field_value (const char * text ,
117+ const char * line_prefix ,
118+ const char * field_name ,
119+ double * value )
120+ {
121+ const char * field_match ;
122+ const char * line_start ;
123+ const char * line_end ;
124+ char needle [64 ];
125+
126+ line_start = text ;
127+
128+ snprintf (needle , sizeof (needle ) - 1 , "%s=" , field_name );
129+
130+ while (line_start != NULL && * line_start != '\0' ) {
131+ line_end = strchr (line_start , '\n' );
132+ if (line_end == NULL ) {
133+ line_end = line_start + strlen (line_start );
134+ }
135+
136+ if (strstr (line_start , line_prefix ) != NULL ) {
137+ field_match = line_start ;
138+
139+ while (field_match != NULL && field_match < line_end ) {
140+ field_match = strstr (field_match , needle );
141+
142+ if (field_match == NULL || field_match >= line_end ) {
143+ break ;
144+ }
145+
146+ if (field_match == line_start ||
147+ (* (field_match - 1 ) != '_' &&
148+ (* (field_match - 1 ) < '0' || * (field_match - 1 ) > '9' ) &&
149+ (* (field_match - 1 ) < 'A' || * (field_match - 1 ) > 'Z' ) &&
150+ (* (field_match - 1 ) < 'a' || * (field_match - 1 ) > 'z' ))) {
151+ field_match += strlen (needle );
152+ * value = strtod (field_match , NULL );
153+ return 0 ;
154+ }
155+
156+ field_match ++ ;
157+ }
158+ }
159+
160+ if (* line_end == '\0' ) {
161+ break ;
162+ }
163+
164+ line_start = line_end + 1 ;
165+ }
166+
167+ return -1 ;
168+ }
169+
110170static void observation_reset (void )
111171{
112172 pthread_mutex_lock (& state_mutex );
@@ -128,6 +188,33 @@ static int observation_add_capture(const char *line_prefix)
128188 }
129189
130190 observed .captures [index ].line_prefix = line_prefix ;
191+ observed .captures [index ].field_name = NULL ;
192+ observed .captures [index ].capture_type = CAPTURE_TYPE_VALUE ;
193+ observed .captures [index ].value_count = 0 ;
194+ observed .capture_count ++ ;
195+
196+ pthread_mutex_unlock (& state_mutex );
197+
198+ return index ;
199+ }
200+
201+ static int observation_add_exp_hist_capture (const char * line_prefix ,
202+ const char * field_name )
203+ {
204+ int index ;
205+
206+ pthread_mutex_lock (& state_mutex );
207+
208+ index = observed .capture_count ;
209+
210+ if (index >= MAX_CAPTURE_METRICS ) {
211+ pthread_mutex_unlock (& state_mutex );
212+ return -1 ;
213+ }
214+
215+ observed .captures [index ].line_prefix = line_prefix ;
216+ observed .captures [index ].field_name = field_name ;
217+ observed .captures [index ].capture_type = CAPTURE_TYPE_EXP_HIST_FIELD ;
131218 observed .captures [index ].value_count = 0 ;
132219 observed .capture_count ++ ;
133220
@@ -309,49 +396,62 @@ static int cb_capture_metrics(void *record, size_t size, void *data)
309396
310397 (void ) data ;
311398
399+ pthread_mutex_lock (& state_mutex );
400+ observed .callback_count ++ ;
401+ pthread_mutex_unlock (& state_mutex );
402+
312403 offset = 0 ;
313- text = NULL ;
314- context = NULL ;
315404
316- ret = cmt_decode_msgpack_create (& context , (char * ) record , size , & offset );
317- if (ret != CMT_DECODE_MSGPACK_SUCCESS ) {
318- if (record != NULL ) {
319- flb_free (record );
405+ while (offset < size ) {
406+ text = NULL ;
407+ context = NULL ;
408+
409+ ret = cmt_decode_msgpack_create (& context , (char * ) record , size , & offset );
410+ if (ret != CMT_DECODE_MSGPACK_SUCCESS ) {
411+ if (context != NULL ) {
412+ cmt_destroy (context );
413+ }
414+
415+ return -1 ;
320416 }
321417
322- return -1 ;
323- }
418+ text = cmt_encode_text_create (context );
324419
325- text = cmt_encode_text_create (context );
420+ if (text != NULL ) {
421+ pthread_mutex_lock (& state_mutex );
326422
327- pthread_mutex_lock (& state_mutex );
423+ for (index = 0 ; index < observed .capture_count ; index ++ ) {
424+ ret = -1 ;
328425
329- observed .callback_count ++ ;
426+ if (observed .captures [index ].capture_type ==
427+ CAPTURE_TYPE_EXP_HIST_FIELD ) {
428+ ret = find_exp_hist_field_value (
429+ text ,
430+ observed .captures [index ].line_prefix ,
431+ observed .captures [index ].field_name ,
432+ & value );
433+ }
434+ else {
435+ ret = find_metric_value (text ,
436+ observed .captures [index ].line_prefix ,
437+ & value );
438+ }
330439
331- if (text != NULL ) {
332- for (index = 0 ; index < observed .capture_count ; index ++ ) {
333- if (find_metric_value (text ,
334- observed .captures [index ].line_prefix ,
335- & value ) == 0 ) {
336- if (observed .captures [index ].value_count < MAX_CAPTURED_VALUES ) {
337- observed .captures [index ].values [
338- observed .captures [index ].value_count ] = value ;
339- observed .captures [index ].value_count ++ ;
440+ if (ret == 0 ) {
441+ if (observed .captures [index ].value_count <
442+ MAX_CAPTURED_VALUES ) {
443+ observed .captures [index ].values [
444+ observed .captures [index ].value_count ] = value ;
445+ observed .captures [index ].value_count ++ ;
446+ }
340447 }
341448 }
342- }
343- }
344-
345- pthread_mutex_unlock (& state_mutex );
346449
347- if (text != NULL ) {
348- cmt_encode_text_destroy (text );
349- }
350-
351- cmt_destroy (context );
450+ pthread_mutex_unlock (& state_mutex );
451+ cmt_encode_text_destroy (text );
452+ }
352453
353- if (record != NULL ) {
354- flb_free (record );
454+ cmt_destroy (context );
355455 }
356456
357457 return 0 ;
@@ -464,6 +564,7 @@ static struct rt_ctx *rt_ctx_create(const char *drop_first,
464564 ret = flb_output_set (context -> flb ,
465565 context -> output_ffd ,
466566 "match" , "*" ,
567+ "data_mode" , "chunk" ,
467568 NULL );
468569 if (ret != 0 ) {
469570 flb_destroy (context -> flb );
@@ -554,6 +655,66 @@ static struct cmt *create_counter_context(const char *name,
554655 return context ;
555656}
556657
658+ static struct cmt * create_exp_histogram_context (const char * name ,
659+ uint64_t timestamp ,
660+ int32_t scale ,
661+ uint64_t zero_count ,
662+ double zero_threshold ,
663+ int32_t positive_offset ,
664+ size_t positive_count ,
665+ uint64_t * positive_buckets ,
666+ int32_t negative_offset ,
667+ size_t negative_count ,
668+ uint64_t * negative_buckets ,
669+ int sum_set ,
670+ double sum ,
671+ uint64_t count )
672+ {
673+ struct cmt * context ;
674+ struct cmt_exp_histogram * exp_histogram ;
675+
676+ context = cmt_create ();
677+ if (context == NULL ) {
678+ return NULL ;
679+ }
680+
681+ exp_histogram = cmt_exp_histogram_create (context ,
682+ "" ,
683+ "" ,
684+ (char * ) name ,
685+ "help" ,
686+ 0 ,
687+ NULL );
688+ if (exp_histogram == NULL ) {
689+ cmt_destroy (context );
690+ return NULL ;
691+ }
692+
693+ exp_histogram -> aggregation_type = CMT_AGGREGATION_TYPE_CUMULATIVE ;
694+
695+ if (cmt_exp_histogram_set_default (exp_histogram ,
696+ timestamp ,
697+ scale ,
698+ zero_count ,
699+ zero_threshold ,
700+ positive_offset ,
701+ positive_count ,
702+ positive_buckets ,
703+ negative_offset ,
704+ negative_count ,
705+ negative_buckets ,
706+ sum_set ,
707+ sum ,
708+ count ,
709+ 0 ,
710+ NULL ) != 0 ) {
711+ cmt_destroy (context );
712+ return NULL ;
713+ }
714+
715+ return context ;
716+ }
717+
557718static int send_metrics_context (struct rt_ctx * context , struct cmt * metrics_context )
558719{
559720 int ret ;
@@ -802,10 +963,83 @@ static void flb_test_runtime_non_monotonic_sum_passthrough(void)
802963 rt_ctx_destroy (rt );
803964}
804965
966+ static void flb_test_runtime_exp_histogram_scale_change (void )
967+ {
968+ int count_index ;
969+ int sum_index ;
970+ uint64_t positive_a [2 ];
971+ uint64_t positive_b [4 ];
972+ struct cmt * context ;
973+ struct rt_ctx * rt ;
974+
975+ positive_a [0 ] = 4 ;
976+ positive_a [1 ] = 8 ;
977+
978+ positive_b [0 ] = 5 ;
979+ positive_b [1 ] = 8 ;
980+ positive_b [2 ] = 11 ;
981+ positive_b [3 ] = 15 ;
982+
983+ observation_reset ();
984+ count_index = observation_add_exp_hist_capture ("rt_exp_hist" , "count" );
985+ sum_index = observation_add_exp_hist_capture ("rt_exp_hist" , "sum" );
986+ TEST_CHECK (count_index >= 0 );
987+ TEST_CHECK (sum_index >= 0 );
988+
989+ rt = rt_ctx_create ("true" , "true" );
990+ TEST_CHECK (rt != NULL );
991+
992+ context = create_exp_histogram_context ("rt_exp_hist" ,
993+ 100 ,
994+ 1 ,
995+ 2 ,
996+ 0.0 ,
997+ 0 ,
998+ 2 ,
999+ positive_a ,
1000+ 0 ,
1001+ 0 ,
1002+ NULL ,
1003+ CMT_TRUE ,
1004+ 40.0 ,
1005+ 10 );
1006+ TEST_CHECK (context != NULL );
1007+ TEST_CHECK (send_metrics_context (rt , context ) == 0 );
1008+ cmt_destroy (context );
1009+ flb_time_msleep (500 );
1010+ TEST_CHECK (observation_get_value_count (count_index ) == 0 );
1011+
1012+ context = create_exp_histogram_context ("rt_exp_hist" ,
1013+ 200 ,
1014+ 2 ,
1015+ 3 ,
1016+ 0.0 ,
1017+ 0 ,
1018+ 4 ,
1019+ positive_b ,
1020+ 0 ,
1021+ 0 ,
1022+ NULL ,
1023+ CMT_TRUE ,
1024+ 58.0 ,
1025+ 18 );
1026+ TEST_CHECK (context != NULL );
1027+ TEST_CHECK (send_metrics_context (rt , context ) == 0 );
1028+ cmt_destroy (context );
1029+
1030+ TEST_CHECK (wait_for_value_count (count_index , 1 , 2000 ) == 0 );
1031+ TEST_CHECK (wait_for_value_count (sum_index , 1 , 2000 ) == 0 );
1032+ TEST_CHECK (fabs (observation_get_value (count_index , 0 ) - 8.0 ) < 0.0001 );
1033+ TEST_CHECK (fabs (observation_get_value (sum_index , 0 ) - 18.0 ) < 0.0001 );
1034+
1035+ rt_ctx_destroy (rt );
1036+ }
1037+
8051038TEST_LIST = {
8061039 {"counter_default_behaviors" , flb_test_runtime_counter_default_behaviors },
8071040 {"counter_reset_keep_and_first_sample" , flb_test_runtime_counter_reset_keep_and_first_sample },
8081041 {"multi_series" , flb_test_runtime_multi_series },
8091042 {"non_monotonic_sum_passthrough" , flb_test_runtime_non_monotonic_sum_passthrough },
1043+ {"exp_histogram_scale_change" , flb_test_runtime_exp_histogram_scale_change },
8101044 {NULL , NULL }
8111045};
0 commit comments