22
33#include <fluent-bit.h>
44#include "flb_tests_runtime.h"
5+ #include <stdio.h>
6+ #include <unistd.h>
57
68/* Test data */
79#include "data/es/json_es.h" /* JSON_ES */
@@ -1058,6 +1060,271 @@ void flb_test_response_partially_success()
10581060 flb_destroy (ctx );
10591061}
10601062
1063+ static int create_upstream_file (char * path , size_t size )
1064+ {
1065+ int fd ;
1066+ int ret ;
1067+ const char * content =
1068+ "[UPSTREAM]\n"
1069+ " name es_cluster\n"
1070+ "\n"
1071+ "[NODE]\n"
1072+ " name es_node_1\n"
1073+ " host 127.0.0.1\n"
1074+ " port 9200\n" ;
1075+
1076+ snprintf (path , size , "/tmp/flb-es-upstream-XXXXXX" );
1077+ fd = mkstemp (path );
1078+ if (fd == -1 ) {
1079+ return -1 ;
1080+ }
1081+
1082+ ret = write (fd , content , strlen (content ));
1083+ close (fd );
1084+ if (ret <= 0 ) {
1085+ unlink (path );
1086+ return -1 ;
1087+ }
1088+
1089+ return 0 ;
1090+ }
1091+
1092+ static flb_ctx_t * create_upstream_test_ctx (char * upstream_file ,
1093+ size_t upstream_file_size ,
1094+ int * in_ffd ,
1095+ int * out_ffd )
1096+ {
1097+ int ret ;
1098+ flb_ctx_t * ctx ;
1099+
1100+ ret = create_upstream_file (upstream_file , upstream_file_size );
1101+ TEST_CHECK (ret == 0 );
1102+ if (ret != 0 ) {
1103+ return NULL ;
1104+ }
1105+
1106+ ctx = flb_create ();
1107+ flb_service_set (ctx , "flush" , "1" , "grace" , "1" , NULL );
1108+
1109+ * in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
1110+ flb_input_set (ctx , * in_ffd , "tag" , "test" , NULL );
1111+
1112+ * out_ffd = flb_output (ctx , (char * ) "es" , NULL );
1113+ flb_output_set (ctx , * out_ffd ,
1114+ "match" , "test" ,
1115+ "upstream" , upstream_file ,
1116+ NULL );
1117+
1118+ return ctx ;
1119+ }
1120+
1121+ void flb_test_upstream_write_operation ()
1122+ {
1123+ int ret ;
1124+ int size = sizeof (JSON_ES ) - 1 ;
1125+ int in_ffd ;
1126+ int out_ffd ;
1127+ char upstream_file [64 ];
1128+ flb_ctx_t * ctx ;
1129+
1130+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1131+ & in_ffd , & out_ffd );
1132+ if (!ctx ) {
1133+ return ;
1134+ }
1135+ (void ) in_ffd ;
1136+
1137+ flb_output_set (ctx , out_ffd ,
1138+ "write_operation" , "index" ,
1139+ NULL );
1140+
1141+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1142+ cb_check_write_op_index ,
1143+ NULL , NULL );
1144+ TEST_CHECK (ret == 0 );
1145+
1146+ ret = flb_start (ctx );
1147+ TEST_CHECK (ret == 0 );
1148+
1149+ flb_lib_push (ctx , in_ffd , (char * ) JSON_ES , size );
1150+
1151+ sleep (2 );
1152+ flb_stop (ctx );
1153+ flb_destroy (ctx );
1154+ unlink (upstream_file );
1155+ }
1156+
1157+ void flb_test_upstream_null_index ()
1158+ {
1159+ int ret ;
1160+ int in_ffd ;
1161+ int out_ffd ;
1162+ char upstream_file [64 ];
1163+ flb_ctx_t * ctx ;
1164+
1165+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1166+ & in_ffd , & out_ffd );
1167+ if (!ctx ) {
1168+ return ;
1169+ }
1170+
1171+ flb_output_set (ctx , out_ffd ,
1172+ "index" , "" ,
1173+ "type" , "type_test" ,
1174+ NULL );
1175+
1176+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1177+ cb_check_index_type ,
1178+ NULL , NULL );
1179+ TEST_CHECK (ret == 0 );
1180+
1181+ ret = flb_start (ctx );
1182+ TEST_CHECK (ret == -1 );
1183+
1184+ flb_stop (ctx );
1185+ flb_destroy (ctx );
1186+ unlink (upstream_file );
1187+ }
1188+
1189+ void flb_test_upstream_index_type ()
1190+ {
1191+ int ret ;
1192+ int size = sizeof (JSON_ES ) - 1 ;
1193+ int in_ffd ;
1194+ int out_ffd ;
1195+ char upstream_file [64 ];
1196+ flb_ctx_t * ctx ;
1197+
1198+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1199+ & in_ffd , & out_ffd );
1200+ if (!ctx ) {
1201+ return ;
1202+ }
1203+
1204+ flb_output_set (ctx , out_ffd ,
1205+ "index" , "index_test" ,
1206+ "type" , "type_test" ,
1207+ NULL );
1208+
1209+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1210+ cb_check_index_type ,
1211+ NULL , NULL );
1212+ TEST_CHECK (ret == 0 );
1213+
1214+ ret = flb_start (ctx );
1215+ TEST_CHECK (ret == 0 );
1216+ flb_lib_push (ctx , in_ffd , (char * ) JSON_ES , size );
1217+
1218+ sleep (2 );
1219+ flb_stop (ctx );
1220+ flb_destroy (ctx );
1221+ unlink (upstream_file );
1222+ }
1223+
1224+ void flb_test_upstream_logstash_format ()
1225+ {
1226+ int ret ;
1227+ int size = sizeof (JSON_ES ) - 1 ;
1228+ int in_ffd ;
1229+ int out_ffd ;
1230+ char upstream_file [64 ];
1231+ flb_ctx_t * ctx ;
1232+
1233+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1234+ & in_ffd , & out_ffd );
1235+ if (!ctx ) {
1236+ return ;
1237+ }
1238+
1239+ flb_output_set (ctx , out_ffd ,
1240+ "logstash_format" , "on" ,
1241+ "logstash_prefix" , "prefix" ,
1242+ "logstash_dateformat" , "%Y-%m-%d" ,
1243+ NULL );
1244+
1245+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1246+ cb_check_logstash_format ,
1247+ NULL , NULL );
1248+ TEST_CHECK (ret == 0 );
1249+
1250+ ret = flb_start (ctx );
1251+ TEST_CHECK (ret == 0 );
1252+ flb_lib_push (ctx , in_ffd , (char * ) JSON_ES , size );
1253+
1254+ sleep (2 );
1255+ flb_stop (ctx );
1256+ flb_destroy (ctx );
1257+ unlink (upstream_file );
1258+ }
1259+
1260+ void flb_test_upstream_replace_dots ()
1261+ {
1262+ int ret ;
1263+ int size = sizeof (JSON_DOTS ) - 1 ;
1264+ int in_ffd ;
1265+ int out_ffd ;
1266+ char upstream_file [64 ];
1267+ flb_ctx_t * ctx ;
1268+
1269+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1270+ & in_ffd , & out_ffd );
1271+ if (!ctx ) {
1272+ return ;
1273+ }
1274+
1275+ flb_output_set (ctx , out_ffd ,
1276+ "replace_dots" , "on" ,
1277+ NULL );
1278+
1279+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1280+ cb_check_replace_dots ,
1281+ NULL , NULL );
1282+ TEST_CHECK (ret == 0 );
1283+
1284+ ret = flb_start (ctx );
1285+ TEST_CHECK (ret == 0 );
1286+ flb_lib_push (ctx , in_ffd , (char * ) JSON_DOTS , size );
1287+
1288+ sleep (2 );
1289+ flb_stop (ctx );
1290+ flb_destroy (ctx );
1291+ unlink (upstream_file );
1292+ }
1293+
1294+ void flb_test_upstream_id_key ()
1295+ {
1296+ int ret ;
1297+ int size = sizeof (JSON_ES ) - 1 ;
1298+ int in_ffd ;
1299+ int out_ffd ;
1300+ char upstream_file [64 ];
1301+ flb_ctx_t * ctx ;
1302+
1303+ ctx = create_upstream_test_ctx (upstream_file , sizeof (upstream_file ),
1304+ & in_ffd , & out_ffd );
1305+ if (!ctx ) {
1306+ return ;
1307+ }
1308+
1309+ flb_output_set (ctx , out_ffd ,
1310+ "id_key" , "key_2" ,
1311+ NULL );
1312+
1313+ ret = flb_output_set_test (ctx , out_ffd , "formatter" ,
1314+ cb_check_id_key ,
1315+ NULL , NULL );
1316+ TEST_CHECK (ret == 0 );
1317+
1318+ ret = flb_start (ctx );
1319+ TEST_CHECK (ret == 0 );
1320+ flb_lib_push (ctx , in_ffd , (char * ) JSON_ES , size );
1321+
1322+ sleep (2 );
1323+ flb_stop (ctx );
1324+ flb_destroy (ctx );
1325+ unlink (upstream_file );
1326+ }
1327+
10611328/* Test list */
10621329TEST_LIST = {
10631330 {"long_index" , flb_test_long_index },
@@ -1077,5 +1344,11 @@ TEST_LIST = {
10771344 {"response_success" , flb_test_response_success },
10781345 {"response_successes" , flb_test_response_successes },
10791346 {"response_partially_success" , flb_test_response_partially_success },
1347+ {"upstream_write_operation" , flb_test_upstream_write_operation },
1348+ {"upstream_null_index" , flb_test_upstream_null_index },
1349+ {"upstream_index_type" , flb_test_upstream_index_type },
1350+ {"upstream_logstash_format" , flb_test_upstream_logstash_format },
1351+ {"upstream_replace_dots" , flb_test_upstream_replace_dots },
1352+ {"upstream_id_key" , flb_test_upstream_id_key },
10801353 {NULL , NULL }
10811354};
0 commit comments