55#include <fluent-bit/flb_parser.h>
66#include <fluent-bit/flb_mem.h>
77#include <fluent-bit/multiline/flb_ml.h>
8+ #include <fluent-bit/multiline/flb_ml_group.h>
89#include <fluent-bit/multiline/flb_ml_rule.h>
910#include <fluent-bit/multiline/flb_ml_parser.h>
1011
12+ #ifndef FLB_SYSTEM_WINDOWS
13+ #include <sys/wait.h>
14+ #include <unistd.h>
15+ #endif
16+
1117#include "flb_tests_internal.h"
1218
1319struct record_check {
@@ -20,6 +26,11 @@ struct expected_result {
2026 struct record_check * out_records ;
2127};
2228
29+ struct captured_logs {
30+ int flushes ;
31+ char logs [8 ][256 ];
32+ };
33+
2334/* Docker */
2435struct record_check docker_input [] = {
2536 {"{\"log\": \"aa\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}" },
@@ -465,6 +476,139 @@ static int flush_callback(struct flb_ml_parser *parser,
465476 return 0 ;
466477}
467478
479+ static int capture_logs_callback (struct flb_ml_parser * parser ,
480+ struct flb_ml_stream * mst ,
481+ void * data , char * buf_data , size_t buf_size )
482+ {
483+ int i ;
484+ int ret ;
485+ int len ;
486+ size_t off = 0 ;
487+ msgpack_unpacked result ;
488+ msgpack_object * map ;
489+ msgpack_object key ;
490+ msgpack_object val ;
491+ struct flb_time tm ;
492+ struct captured_logs * captured = data ;
493+
494+ if (!captured || captured -> flushes >= 8 ) {
495+ return 0 ;
496+ }
497+
498+ msgpack_unpacked_init (& result );
499+ ret = msgpack_unpack_next (& result , buf_data , buf_size , & off );
500+ TEST_CHECK (ret == MSGPACK_UNPACK_SUCCESS );
501+
502+ flb_time_pop_from_msgpack (& tm , & result , & map );
503+ TEST_CHECK (flb_time_to_nanosec (& tm ) != 0L );
504+
505+ for (i = 0 ; i < map -> via .map .size ; i ++ ) {
506+ key = map -> via .map .ptr [i ].key ;
507+ val = map -> via .map .ptr [i ].val ;
508+
509+ if (key .type != MSGPACK_OBJECT_STR || val .type != MSGPACK_OBJECT_STR ) {
510+ continue ;
511+ }
512+
513+ if (key .via .str .size == 3 &&
514+ strncmp (key .via .str .ptr , "log" , 3 ) == 0 ) {
515+ len = val .via .str .size ;
516+ if (len > 255 ) {
517+ len = 255 ;
518+ }
519+
520+ memcpy (captured -> logs [captured -> flushes ], val .via .str .ptr , len );
521+ captured -> logs [captured -> flushes ][len ] = '\0' ;
522+ break ;
523+ }
524+ }
525+
526+ captured -> flushes ++ ;
527+ msgpack_unpacked_destroy (& result );
528+ return 0 ;
529+ }
530+
531+ static int log_matches (char * actual , const char * expected )
532+ {
533+ size_t actual_len ;
534+ size_t expected_len ;
535+
536+ actual_len = strlen (actual );
537+ expected_len = strlen (expected );
538+
539+ if (strcmp (actual , expected ) == 0 ) {
540+ return FLB_TRUE ;
541+ }
542+
543+ if (actual_len == expected_len + 1 &&
544+ memcmp (actual , expected , expected_len ) == 0 &&
545+ actual [actual_len - 1 ] == '\n' ) {
546+ return FLB_TRUE ;
547+ }
548+
549+ return FLB_FALSE ;
550+ }
551+
552+ static struct flb_ml_parser * create_regex_test_parser (struct flb_config * config ,
553+ const char * name ,
554+ const char * pattern )
555+ {
556+ int ret ;
557+ struct flb_ml_parser * mlp ;
558+ struct flb_ml_parser_params params ;
559+
560+ params = flb_ml_parser_params_default (name );
561+ params .type = FLB_ML_REGEX ;
562+ params .negate = FLB_FALSE ;
563+ params .flush_ms = 1000 ;
564+
565+ mlp = flb_ml_parser_create_params (config , & params );
566+ TEST_CHECK (mlp != NULL );
567+
568+ ret = flb_ml_rule_create (mlp , "start_state" , (char * ) pattern , "cont" , NULL );
569+ TEST_CHECK (ret == 0 );
570+ ret = flb_ml_rule_create (mlp , "cont" , (char * ) pattern , "cont" , NULL );
571+ TEST_CHECK (ret == 0 );
572+ ret = flb_ml_parser_init (mlp );
573+ TEST_CHECK (ret == 0 );
574+
575+ return mlp ;
576+ }
577+
578+ #ifndef FLB_SYSTEM_WINDOWS
579+ static void assert_child_crashes (void (* fn )(void ))
580+ {
581+ pid_t pid ;
582+ int wstatus ;
583+
584+ pid = fork ();
585+ if (pid == 0 ) {
586+ fn ();
587+ exit (0 );
588+ }
589+ else if (pid > 0 ) {
590+ if (waitpid (pid , & wstatus , 0 ) == -1 ) {
591+ TEST_CHECK (0 );
592+ TEST_MSG ("waitpid failed" );
593+ return ;
594+ }
595+
596+ TEST_CHECK (WIFSIGNALED (wstatus ));
597+ if (!WIFSIGNALED (wstatus )) {
598+ TEST_MSG ("expected child crash, got status %d" , wstatus );
599+ return ;
600+ }
601+
602+ TEST_CHECK (WTERMSIG (wstatus ) == SIGSEGV );
603+ TEST_MSG ("expected SIGSEGV, got signal %d" , WTERMSIG (wstatus ));
604+ }
605+ else {
606+ TEST_CHECK (0 );
607+ TEST_MSG ("fork() failed" );
608+ }
609+ }
610+ #endif
611+
468612static void test_parser_docker ()
469613{
470614 int i ;
@@ -1643,6 +1787,158 @@ static void test_buffer_limit_disabled()
16431787 flb_config_exit (config );
16441788}
16451789
1790+ static void test_known_bug_multi_group_flush_only_first_group ()
1791+ {
1792+ int ret ;
1793+ int saw_a = FLB_FALSE ;
1794+ int saw_b = FLB_FALSE ;
1795+ int i ;
1796+ uint64_t stream_id ;
1797+ struct flb_config * config ;
1798+ struct flb_ml * ml ;
1799+ struct flb_time tm ;
1800+ struct captured_logs captured = {0 };
1801+
1802+ config = flb_config_init ();
1803+ ml = flb_ml_create (config , "known-bug-multi-group" );
1804+ TEST_CHECK (ml != NULL );
1805+
1806+ create_regex_test_parser (config , "bug_group_a" , "/^A/" );
1807+ create_regex_test_parser (config , "bug_group_b" , "/^B/" );
1808+
1809+ TEST_CHECK (flb_ml_parser_instance_create (ml , "bug_group_a" ) != NULL );
1810+ TEST_CHECK (flb_ml_group_create (ml ) != NULL );
1811+ TEST_CHECK (flb_ml_parser_instance_create (ml , "bug_group_b" ) != NULL );
1812+
1813+ ret = flb_ml_stream_create (ml , "bug-stream" , -1 ,
1814+ capture_logs_callback , & captured , & stream_id );
1815+ TEST_CHECK (ret == 0 );
1816+
1817+ flb_time_get (& tm );
1818+ ret = flb_ml_append_text (ml , stream_id , & tm , "A buffered" , 10 );
1819+ TEST_CHECK (ret == FLB_MULTILINE_OK );
1820+
1821+ flb_time_get (& tm );
1822+ ret = flb_ml_append_text (ml , stream_id , & tm , "B buffered" , 10 );
1823+ TEST_CHECK (ret == FLB_MULTILINE_OK );
1824+
1825+ flb_ml_flush_pending_now (ml );
1826+
1827+ for (i = 0 ; i < captured .flushes ; i ++ ) {
1828+ if (log_matches (captured .logs [i ], "A buffered" )) {
1829+ saw_a = FLB_TRUE ;
1830+ }
1831+ if (log_matches (captured .logs [i ], "B buffered" )) {
1832+ saw_b = FLB_TRUE ;
1833+ }
1834+ }
1835+
1836+ TEST_CHECK (captured .flushes == 1 );
1837+ TEST_CHECK (saw_a == FLB_TRUE );
1838+ TEST_CHECK (saw_b == FLB_FALSE );
1839+
1840+ flb_ml_destroy (ml );
1841+ flb_config_exit (config );
1842+ }
1843+
1844+ static void test_known_bug_truncation_drops_overflow_line ()
1845+ {
1846+ int ret ;
1847+ int i ;
1848+ int saw_overflow_line = FLB_FALSE ;
1849+ uint64_t stream_id ;
1850+ struct flb_config * config ;
1851+ struct flb_ml * ml ;
1852+ struct flb_time tm ;
1853+ struct captured_logs captured = {0 };
1854+
1855+ config = flb_config_init ();
1856+ if (config -> multiline_buffer_limit ) {
1857+ flb_free (config -> multiline_buffer_limit );
1858+ }
1859+ config -> multiline_buffer_limit = flb_strdup ("8" );
1860+
1861+ ml = flb_ml_create (config , "known-bug-truncation" );
1862+ TEST_CHECK (ml != NULL );
1863+
1864+ create_regex_test_parser (config , "bug_truncation" , "/./" );
1865+ TEST_CHECK (flb_ml_parser_instance_create (ml , "bug_truncation" ) != NULL );
1866+
1867+ ret = flb_ml_stream_create (ml , "bug-stream" , -1 ,
1868+ capture_logs_callback , & captured , & stream_id );
1869+ TEST_CHECK (ret == 0 );
1870+
1871+ flb_time_get (& tm );
1872+ ret = flb_ml_append_text (ml , stream_id , & tm , "AAAAA" , 5 );
1873+ TEST_CHECK (ret == FLB_MULTILINE_OK );
1874+
1875+ flb_time_get (& tm );
1876+ ret = flb_ml_append_text (ml , stream_id , & tm , "BBBBB" , 5 );
1877+ TEST_CHECK (ret == FLB_MULTILINE_TRUNCATED );
1878+
1879+ flb_ml_flush_pending_now (ml );
1880+
1881+ for (i = 0 ; i < captured .flushes ; i ++ ) {
1882+ if (log_matches (captured .logs [i ], "BBBBB" )) {
1883+ saw_overflow_line = FLB_TRUE ;
1884+ }
1885+ }
1886+
1887+ TEST_CHECK (captured .flushes == 1 );
1888+ TEST_CHECK (saw_overflow_line == FLB_FALSE );
1889+
1890+ flb_ml_destroy (ml );
1891+ flb_config_exit (config );
1892+ }
1893+
1894+ #ifndef FLB_SYSTEM_WINDOWS
1895+ static void crash_empty_context_flush ()
1896+ {
1897+ struct flb_config * config ;
1898+ struct flb_ml * ml ;
1899+
1900+ config = flb_config_init ();
1901+ ml = flb_ml_create (config , "known-bug-empty-flush" );
1902+ flb_ml_flush_pending_now (ml );
1903+ flb_ml_destroy (ml );
1904+ flb_config_exit (config );
1905+ }
1906+
1907+ static void crash_empty_context_append ()
1908+ {
1909+ struct flb_config * config ;
1910+ struct flb_ml * ml ;
1911+ struct flb_time tm ;
1912+
1913+ config = flb_config_init ();
1914+ ml = flb_ml_create (config , "known-bug-empty-append" );
1915+ flb_time_get (& tm );
1916+ flb_ml_append_text (ml , 1 , & tm , "hello" , 5 );
1917+ flb_ml_destroy (ml );
1918+ flb_config_exit (config );
1919+ }
1920+
1921+ static void test_known_bug_empty_context_flush_crashes ()
1922+ {
1923+ assert_child_crashes (crash_empty_context_flush );
1924+ }
1925+
1926+ static void test_known_bug_empty_context_append_crashes ()
1927+ {
1928+ assert_child_crashes (crash_empty_context_append );
1929+ }
1930+ #else
1931+ static void test_known_bug_empty_context_flush_crashes ()
1932+ {
1933+ TEST_MSG ("skipped on Windows" );
1934+ }
1935+
1936+ static void test_known_bug_empty_context_append_crashes ()
1937+ {
1938+ TEST_MSG ("skipped on Windows" );
1939+ }
1940+ #endif
1941+
16461942TEST_LIST = {
16471943 /* Normal features tests */
16481944 { "parser_docker" , test_parser_docker },
@@ -1657,6 +1953,14 @@ TEST_LIST = {
16571953 { "endswith" , test_endswith },
16581954 { "buffer_limit_truncation" , test_buffer_limit_truncation },
16591955 { "buffer_limit_disabled" , test_buffer_limit_disabled },
1956+ { "known_bug_multi_group_flush_only_first_group" ,
1957+ test_known_bug_multi_group_flush_only_first_group },
1958+ { "known_bug_truncation_drops_overflow_line" ,
1959+ test_known_bug_truncation_drops_overflow_line },
1960+ { "known_bug_empty_context_flush_crashes" ,
1961+ test_known_bug_empty_context_flush_crashes },
1962+ { "known_bug_empty_context_append_crashes" ,
1963+ test_known_bug_empty_context_append_crashes },
16601964
16611965 /* Issues reported on Github */
16621966 { "issue_3817_1" , test_issue_3817_1 },
0 commit comments