Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion src/config_format/flb_cf_yaml.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ enum section {
SECTION_STREAM_PROCESSOR,
SECTION_PLUGINS,
SECTION_UPSTREAM_SERVERS,
SECTION_METADATA,
SECTION_OTHER,
};

Expand All @@ -84,6 +85,7 @@ static char *section_names[] = {
"stream_processor",
"plugins",
"upstream_servers",
"metadata",
"other"
};

Expand Down Expand Up @@ -123,6 +125,7 @@ enum state {

STATE_SERVICE, /* 'service' section */
STATE_INCLUDE, /* 'includes' section */
STATE_METADATA, /* 'metadata' section */
STATE_OTHER, /* any other unknown section */

STATE_CUSTOM, /* custom plugins */
Expand Down Expand Up @@ -272,6 +275,8 @@ static char *state_str(enum state val)
return "service";
case STATE_INCLUDE:
return "include";
case STATE_METADATA:
return "metadata";
case STATE_OTHER:
return "other";
case STATE_CUSTOM:
Expand Down Expand Up @@ -1778,9 +1783,22 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
return YAML_FAILURE;
}
}
else if (strcasecmp(value, "metadata") == 0) {
state = state_push_section(ctx, STATE_METADATA, SECTION_METADATA);

if (state == NULL) {
flb_error("unable to allocate state");
return YAML_FAILURE;
}

if (state_create_section(conf, state, value) == -1) {
flb_error("unable to allocate section: %s", value);
return YAML_FAILURE;
}
}
else {
/* any other main section definition (e.g: similar to STATE_SERVICE) */
state = state_push(ctx, STATE_OTHER);
state = state_push_section(ctx, STATE_OTHER, SECTION_OTHER);

if (state == NULL) {
flb_error("unable to allocate state");
Expand Down Expand Up @@ -1818,6 +1836,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
/* service or others */
case STATE_ENV:
case STATE_SERVICE:
case STATE_METADATA:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
case STATE_OTHER:
switch(event->type) {
case YAML_MAPPING_START_EVENT:
Expand Down Expand Up @@ -1876,6 +1895,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
switch (state->state) {
case STATE_SERVICE:
case STATE_ENV:
case STATE_METADATA:
case STATE_OTHER:
break;
default:
Expand All @@ -1898,6 +1918,23 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,

case STATE_SECTION_VAL:
switch(event->type) {
case YAML_MAPPING_START_EVENT:
case YAML_SEQUENCE_START_EVENT:
if (state->section == SECTION_ENV) {
yaml_error_event(ctx, state, event);
return YAML_FAILURE;
}
if (state->section != SECTION_METADATA) {
yaml_error_event(ctx, state, event);
return YAML_FAILURE;
}

state = state_push_variant(ctx, state, event->type == YAML_MAPPING_START_EVENT);
if (state == NULL) {
flb_error("unable to allocate state");
return YAML_FAILURE;
}
break;
case YAML_SCALAR_EVENT:
value = (char *) event->data.scalar.value;

Expand All @@ -1913,6 +1950,23 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,
return YAML_FAILURE;
}
}
else if (state->section == SECTION_METADATA) {
variant = state_variant_parse_scalar(event);
if (variant == NULL) {
flb_error("unable to allocate memory for variant");
return YAML_FAILURE;
}

if (flb_cf_section_property_add_variant(conf,
state->cf_section->properties,
state->key,
flb_sds_len(state->key),
variant) == NULL) {
cfl_variant_destroy(variant);
flb_error("unable to insert variant");
return YAML_FAILURE;
}
}
else {

/* register key/value pair as a property */
Expand Down Expand Up @@ -2370,6 +2424,34 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx,

break;
}
else if (state->state == STATE_SECTION_VAL) {
if (state->section == SECTION_ENV) {
flb_error("invalid type for env entry");
return YAML_FAILURE;
}
if (state->section != SECTION_METADATA) {
flb_error("variant values are only valid in metadata section");
return YAML_FAILURE;
}

if (flb_cf_section_property_add_variant(conf,
state->cf_section->properties,
state->key,
flb_sds_len(state->key),
variant) == NULL) {
cfl_variant_destroy(variant);
flb_error("unable to insert variant");
return YAML_FAILURE;
}

state = state_pop(ctx);
if (state == NULL) {
flb_error("no state left");
return YAML_FAILURE;
}

break;
}

if (state->variant->type == CFL_VARIANT_KVLIST && state->variant_kvlist_key == NULL) {
flb_error("invalid state, should have a variant key");
Expand Down
97 changes: 96 additions & 1 deletion tests/internal/config_format_yaml.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml"
#define FLB_005 FLB_TESTS_CONF_PATH "/plugins.yaml"
#define FLB_006 FLB_TESTS_CONF_PATH "/upstream.yaml"
#define FLB_007 FLB_TESTS_CONF_PATH "/metadata.yaml"
#define FLB_008 FLB_TESTS_CONF_PATH "/other_with_nested_map.yaml"

#define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml"
#define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml"
Expand Down Expand Up @@ -364,8 +366,12 @@ static void test_processors()
struct cfl_variant *v;
struct cfl_variant *logs;
struct cfl_variant *record_modifier_filter;
struct cfl_variant *second_processor;
struct cfl_variant *records;
struct cfl_variant *record;
struct cfl_variant *sampling_type;
struct cfl_variant *sampling_settings;
struct cfl_variant *sampling_percentage;
int idx = 0;

cf = flb_cf_yaml_create(NULL, FLB_002, NULL, 0);
Expand Down Expand Up @@ -432,7 +438,7 @@ static void test_processors()

TEST_CHECK(logs->type == CFL_VARIANT_ARRAY);
if (logs->type == CFL_VARIANT_ARRAY) {
TEST_CHECK(logs->data.as_array->entry_count == 1);
TEST_CHECK(logs->data.as_array->entry_count == 2);

record_modifier_filter = cfl_array_fetch_by_index(logs->data.as_array, 0);
TEST_CHECK(record_modifier_filter != NULL);
Expand Down Expand Up @@ -462,6 +468,31 @@ static void test_processors()
}
}
}

second_processor = cfl_array_fetch_by_index(logs->data.as_array, 1);
TEST_CHECK(second_processor != NULL);
TEST_CHECK(second_processor->type == CFL_VARIANT_KVLIST);

if (second_processor != NULL && second_processor->type == CFL_VARIANT_KVLIST) {
sampling_type = cfl_kvlist_fetch(second_processor->data.as_kvlist, "type");
TEST_CHECK(sampling_type != NULL);
TEST_CHECK(sampling_type->type == CFL_VARIANT_STRING);
TEST_CHECK(strcmp(sampling_type->data.as_string, "probabilistic") == 0);
Comment thread
cosmo0920 marked this conversation as resolved.

sampling_settings = cfl_kvlist_fetch(second_processor->data.as_kvlist,
"sampling_settings");
TEST_CHECK(sampling_settings != NULL);
TEST_CHECK(sampling_settings->type == CFL_VARIANT_KVLIST);

if (sampling_settings != NULL &&
sampling_settings->type == CFL_VARIANT_KVLIST) {
sampling_percentage = cfl_kvlist_fetch(sampling_settings->data.as_kvlist,
"sampling_percentage");
TEST_CHECK(sampling_percentage != NULL);
TEST_CHECK(sampling_percentage->type == CFL_VARIANT_UINT);
TEST_CHECK(sampling_percentage->data.as_int64 == 25);
}
}
}
}

Expand Down Expand Up @@ -835,6 +866,68 @@ static void test_upstream_servers()
flb_cf_destroy(cf);
}

static void test_metadata_section()
{
struct flb_cf *cf;
struct flb_cf_section *s;
struct cfl_variant *v;
struct cfl_variant *nested;
struct cfl_variant *tags;

cf = flb_cf_yaml_create(NULL, FLB_007, NULL, 0);
TEST_CHECK(cf != NULL);
if (!cf) {
exit(EXIT_FAILURE);
}

s = flb_cf_section_get_by_name(cf, "metadata");
TEST_CHECK(s != NULL);

TEST_CHECK(mk_list_size(&cf->others) == 1);

v = flb_cf_section_property_get(cf, s, "usecase");
TEST_CHECK(v != NULL);
TEST_CHECK(v->type == CFL_VARIANT_STRING);
TEST_CHECK(strcmp(v->data.as_string, "monitor own Fluent Bit") == 0);

v = flb_cf_section_property_get(cf, s, "config_version");
TEST_CHECK(v != NULL);
TEST_CHECK(v->type == CFL_VARIANT_UINT);
if (v != NULL && v->type == CFL_VARIANT_UINT) {
TEST_CHECK(v->data.as_uint64 == 12345);
}

v = flb_cf_section_property_get(cf, s, "annotations");
TEST_CHECK(v != NULL);
TEST_CHECK(v->type == CFL_VARIANT_KVLIST);
if (v != NULL && v->type == CFL_VARIANT_KVLIST) {
nested = cfl_kvlist_fetch(v->data.as_kvlist, "enabled");
TEST_CHECK(nested != NULL);
TEST_CHECK(nested->type == CFL_VARIANT_BOOL);
TEST_CHECK(nested->data.as_bool == CFL_TRUE);

tags = cfl_kvlist_fetch(v->data.as_kvlist, "tags");
TEST_CHECK(tags != NULL);
TEST_CHECK(tags->type == CFL_VARIANT_ARRAY);
if (tags != NULL && tags->type == CFL_VARIANT_ARRAY) {
TEST_CHECK(tags->data.as_array->entry_count == 2);
}
}

flb_cf_destroy(cf);
}

static void test_other_nested_map_rejected()
{
struct flb_cf *cf;

cf = flb_cf_yaml_create(NULL, FLB_008, NULL, 0);
TEST_CHECK(cf == NULL);
if (cf != NULL) {
flb_cf_destroy(cf);
}
}

static void test_invalid_property()
{
char* test_cases[] = {
Expand Down Expand Up @@ -876,6 +969,8 @@ TEST_LIST = {
{ "stream_processor", test_stream_processor},
{ "plugins", test_plugins},
{ "upstream_servers", test_upstream_servers},
{ "metadata_section", test_metadata_section},
{ "other_nested_map_rejected", test_other_nested_map_rejected},
{ "invalid_input_property", test_invalid_property},
{ 0 }
};
24 changes: 24 additions & 0 deletions tests/internal/data/config_format/yaml/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
env:
fb_env_name: test

service:
flush: 1
log_level: info

metadata:
usecase: monitor own Fluent Bit
config_version: 12345
annotations:
owner: sre
tags:
- edge
- metrics
enabled: true

pipeline:
inputs:
- name: dummy
tag: test
outputs:
- name: stdout
match: "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
service:
flush: 1

custom_info:
nested:
key: value
4 changes: 4 additions & 0 deletions tests/internal/data/config_format/yaml/processors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pipeline:
record:
- filtered_by record_modifier
- powered_by calyptia
- name: sampling
type: probabilistic
sampling_settings:
sampling_percentage: 25
outputs:
- name: stdout
match: "*"
Expand Down
Loading