Skip to content

es_out: support Upstream Servers with configuration overriding#7608

Open
mabrarov wants to merge 15 commits intofluent:masterfrom
mabrarov:feature/out_es_upstream_support_extended
Open

es_out: support Upstream Servers with configuration overriding#7608
mabrarov wants to merge 15 commits intofluent:masterfrom
mabrarov:feature/out_es_upstream_support_extended

Conversation

@mabrarov
Copy link
Copy Markdown
Contributor

@mabrarov mabrarov commented Jun 25, 2023

Implementation of Upstream feature for the Elasticsearch output plugin.

This pull request is based on pull request #1560 and Forward output plugin.

It was tested in a local setup with:

  1. Fluent Bit without Upstream feature connected to a single node of Elasticsearch cluster consisting of 3 master-eligible/data and 1 coordinating nodes.

    Refer to elastic-cluster directory of mabrarov/elastic-stack repository for Docker Compose project used to create target Elasticsearch cluster and Kibana.

    fluent-bit.conf Fluent Bit configuration file used for the test - refer to fluent-bit-es/fluent-bit.conf and (same in YAML format) fluent-bit-es/fluent-bit.yaml in mabrarov/elastic-stack repository.

    Debug log is available at flb_es.log.

  2. Fluent Bit with Upstream feature connected to all Elasticsearch data nodes of Elasticsearch cluster consisting of 3 master-eligible/data and 1 coordinating nodes.

    Refer to elastic-cluster directory of mabrarov/elastic-stack repository for Docker Compose project used to create target Elasticsearch cluster and Kibana.

    fluent-bit.conf Fluent Bit configuration file used for the test - refer to fluent-bit-es-cluster/fluent-bit.conf and (same in YAML format) fluent-bit-es-cluster/fluent-bit.yaml in mabrarov/elastic-stack repository.

    Debug log is available at flb_es_upstream.log.


Testing

  • Example configuration files for the change can be found in mabrarov/elastic-stack repository under fluent-bit-es-cluster directory.
  • Debug log output from testing the change - see above.
  • Attached Valgrind output that shows no leaks or memory corruption was found - refer to flb_run_code_analysis.log for the output of command
    TEST_PRESET=valgrind SKIP_TESTS='flb-rt-out_td flb-it-network' ./run_code_analysis.sh
  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • HA-aware Elasticsearch output with per-upstream/per-node configuration and upstream-aware formatter flush-context callbacks.
  • Integrations

    • Cloud ID parsing for Elasticsearch endpoints.
    • Optional AWS SigV4 support with STS and unsigned-headers handling.
  • Configuration

    • Consolidated ES property names, added upstream option, and corrected default Elasticsearch port to 9200.
  • Quality

    • Ownership-aware resource handling and improved lifecycle/cleanup.
  • Tests

    • Expanded upstream-focused tests covering formatter, flush-context, and config scenarios.

@mabrarov
Copy link
Copy Markdown
Contributor Author

Hi reviewers,

Is it possible to approve only workflow for this pull request, so that automated checks and build can start?

Thank you.

@mabrarov mabrarov temporarily deployed to pr June 28, 2023 17:55 — with GitHub Actions Inactive
@mabrarov mabrarov temporarily deployed to pr June 28, 2023 17:55 — with GitHub Actions Inactive
@mabrarov mabrarov temporarily deployed to pr June 28, 2023 17:55 — with GitHub Actions Inactive
@PettitWesley
Copy link
Copy Markdown
Contributor

@mabrarov sure

@mabrarov mabrarov temporarily deployed to pr June 28, 2023 18:22 — with GitHub Actions Inactive
@mabrarov
Copy link
Copy Markdown
Contributor Author

mabrarov commented Jun 29, 2023

Hi @PettitWesley,

It looks like all failed checks are around run-macos-unit-tests jobs and caused by the following failed unit tests:

  1. flb-rt-in_event_test
  2. flb-rt-out_tcp

I feel like other pull requests have the same issues, i.e. it doesn't seem that the failed checks are caused by this pull request changes.

Help of maintainers is appreciated.

Thank you.

@mabrarov mabrarov force-pushed the feature/out_es_upstream_support_extended branch from ba3382a to b7cd81b Compare July 8, 2023 10:22
@mabrarov
Copy link
Copy Markdown
Contributor Author

Hi @PettitWesley,

Is it possible to trigger automated workflow (build) for this pull request one more time? I found & fixed one issue and added tests for the new code since last build happened.

Thank you.

@mabrarov mabrarov temporarily deployed to pr July 10, 2023 12:05 — with GitHub Actions Inactive
@mabrarov mabrarov temporarily deployed to pr July 10, 2023 12:05 — with GitHub Actions Inactive
@mabrarov mabrarov temporarily deployed to pr July 10, 2023 12:05 — with GitHub Actions Inactive
@mabrarov mabrarov temporarily deployed to pr July 10, 2023 12:28 — with GitHub Actions Inactive
@mabrarov
Copy link
Copy Markdown
Contributor Author

Hi dear reviewers,

Is it possible to get this pull request reviewed / accepted sooner? Is there something pending / waiting from my side to start review?

Thank you.

@mabrarov mabrarov force-pushed the feature/out_es_upstream_support_extended branch from b7cd81b to b81d3f7 Compare July 20, 2023 19:38
@mabrarov
Copy link
Copy Markdown
Contributor Author

Hi @PettitWesley and @edsiper,

It feels like you are code owners for Elasticsearch output plugin. Is there something pending / waiting from my side to start review of this pull request? This new feature was requested 4 years ago and I feel it is something which multiple users of Fluent Bit (not just my team) would like to have.

Thank you.

@mabrarov mabrarov force-pushed the feature/out_es_upstream_support_extended branch from b81d3f7 to f6431c2 Compare September 30, 2023 13:32
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@plugins/out_es/es_conf_parse.c`:
- Around line 51-53: Guard against flb_utils_split returning NULL: check the
returned pointer (toks) immediately after calling flb_utils_split((const char
*)cloud_auth, ':', -1) and if it is NULL, log/return an error (or appropriate
failure code from the surrounding function) instead of proceeding; this prevents
mk_list_foreach(head, toks) from dereferencing a NULL pointer. Update the code
around the flb_utils_split call to perform the NULL check and handle the failure
path cleanly before using toks.

In `@plugins/out_es/es_type.h`:
- Around line 224-233: The doc comment for flb_es_slist_destroy incorrectly
references the type flb_es_list; update the parameter description to mention the
correct type flb_es_slist instead. Locate the comment block above the function
declaration flb_es_slist_destroy and replace "flb_es_list" with "flb_es_slist"
so the `@param` description and explanatory text consistently refer to the struct
flb_es_slist.
♻️ Duplicate comments (1)
tests/runtime/out_elasticsearch.c (1)

22-38: Avoid tmpnam(NULL); prefer a caller‑supplied buffer (or mkstemp).

tmpnam(NULL) relies on a static buffer and is prone to race conditions; if POSIX is acceptable, mkstemp is safer, otherwise use tmpnam(buf).

🔧 Suggested fix (C‑standard buffer usage)
-    upstream_conf_filename = tmpnam(NULL);
-    if (upstream_conf_filename == NULL) {
-        return NULL;
-    }
-    upstream_conf_filename = flb_strdup(upstream_conf_filename);
-    if (upstream_conf_filename == NULL) {
-        return NULL;
-    }
+    upstream_conf_filename = flb_malloc(L_tmpnam);
+    if (upstream_conf_filename == NULL) {
+        return NULL;
+    }
+    if (tmpnam(upstream_conf_filename) == NULL) {
+        flb_free(upstream_conf_filename);
+        return NULL;
+    }

@mabrarov
Copy link
Copy Markdown
Contributor Author

Hi dear reviewers,

I have addressed all comments in this pull request and I consider it ready for review.

Thank you.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@plugins/out_es/es_conf.c`:
- Around line 369-506: Several boolean config assignments call flb_utils_bool()
and assign its result directly to flags (e.g., ec->suppress_type_name,
ec->generate_id, ec->has_aws_auth, ec->logstash_format, ec->time_key_nanos,
ec->include_tag_key, ec->replace_dots, ec->current_time_index, ec->trace_output,
ec->trace_error); flb_utils_bool() returns -1 on invalid input so you must treat
that as a config error. For each place where flb_utils_bool(tmp) is used, store
the result in a temporary int (e.g., int val = flb_utils_bool(tmp)); if val ==
-1 return -1 (abort); otherwise assign ec->field = val; apply this pattern to
all boolean overrides read via flb_upstream_node_get_property in this diff.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@plugins/out_es/es.c`:
- Around line 1279-1282: The config_map entry for logstash_dateformat is writing
the pointer at the wrong place because logstash_dateformat is a flb_es_sds_t and
its entry is missing the "+ offsetof(struct flb_es_sds_t, value)" adjustment;
update the config_map entry for logstash_dateformat (in the block where
FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT is defined) to use offsetof(struct
flb_elasticsearch_config, logstash_dateformat) + offsetof(struct flb_es_sds_t,
value) so it matches the other flb_es_sds_t fields and prevents corrupting the
flb_es_sds_t wrapper.
🧹 Nitpick comments (5)
tests/runtime/out_elasticsearch.c (1)

22-112: create_upstream_conf_file helper is functional but has a minor portability note.

The tmpnam usage with an owned buffer (per past review discussion) is acceptable for test code. The varargs-based config generation with arg_idx ^= 1 toggling is a compact approach.

One minor note: if first_property is non-NULL but the varargs list has an odd number of entries (missing value for a property name), the loop will write a bare newline for the missing value and continue. This is unlikely in practice since tests use explicit NULL terminators, but documenting the contract (pairs of name/value, NULL-terminated) in a comment would help future test authors.

src/flb_lib.c (1)

617-649: flb_output_set_test_flush_ctx_callback overwrites rt_ctx/rt_ffd but not rt_out_callback/rt_data.

This function sets test_mode, rt_ctx, rt_ffd, flush_ctx, and flush_ctx_callback — but it does not set rt_out_callback or rt_data. This means it's only safe to call after flb_output_set_test has been called (which is the current usage in all tests). If called standalone, the formatter test would run but the runtime result checker would be missing.

Consider either documenting that flb_output_set_test must be called first, or simplifying flb_output_set_test_flush_ctx_callback to only set the two fields it's responsible for (flush_ctx and flush_ctx_callback) without touching other test_formatter members.

Simplified version that only sets the callback-specific fields
 int flb_output_set_test_flush_ctx_callback(flb_ctx_t *ctx, int ffd,
                                            char *test_name,
                                            void *(*flush_ctx_callback) (struct flb_config *,
                                                                         struct flb_input_instance *,
                                                                         void *, void *),
                                            void *flush_ctx)
 {
     struct flb_output_instance *o_ins;
 
     o_ins = out_instance_get(ctx, ffd);
     if (!o_ins) {
         return -1;
     }
 
-    /*
-     * Enabling a test, set the output instance in 'test' mode, so no real
-     * flush callback is invoked, only the desired implemented test.
-     */
-
     /* Formatter test */
     if (strcmp(test_name, "formatter") == 0) {
-        o_ins->test_mode = FLB_TRUE;
-        o_ins->test_formatter.rt_ctx = ctx;
-        o_ins->test_formatter.rt_ffd = ffd;
         o_ins->test_formatter.flush_ctx = flush_ctx;
         o_ins->test_formatter.flush_ctx_callback = flush_ctx_callback;
     }
     else {
         return -1;
     }
 
     return 0;
 }
plugins/out_es/es_conf.c (1)

100-107: size_t compared with -1 — works but is implicit.

buffer_size == -1 where buffer_size is size_t compiles to buffer_size == SIZE_MAX. This is correct but may trigger -Wsign-compare warnings on some compilers. Consider using an explicit constant or cast for clarity.

Optional: make the sentinel explicit
 static size_t config_adjust_buffer_size(size_t buffer_size)
 {
-    /* HTTP Payload (response) maximum buffer size (0 == unlimited) */
-    if (buffer_size == -1) {
+    /* HTTP Payload (response) maximum buffer size (0 == unlimited).
+     * flb_utils_size_to_bytes returns -1 on error, which when cast
+     * to size_t becomes SIZE_MAX. */
+    if (buffer_size == (size_t) -1) {
         return 0;
     }
     return buffer_size;
 }
plugins/out_es/es.h (1)

53-147: Ownership model for flb_elasticsearch_config fields is clear but mixing raw and wrapper types warrants a brief note.

Fields like http_user, http_passwd, http_api_key are plain char * (owned by flb_config_map), while cloud_user/cloud_passwd use struct flb_es_str and logstash fields use struct flb_es_sds_t (shared-ownership wrappers). This asymmetry is intentional per the PR discussion, but a short comment near the raw char * group explaining why they don't need wrappers (e.g., "owned by config_map, never shared across upstream nodes") would help future maintainers.

plugins/out_es/es.c (1)

312-315: Remove commented-out variable declarations.

These four commented-out lines (msgpack_unpacked result, msgpack_object root, msgpack_object *obj) appear to be leftover from refactoring.

♻️ Proposed cleanup
-    // msgpack_unpacked result;
-    // msgpack_object root;
     msgpack_object map;
-    // msgpack_object *obj;

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@plugins/out_es/es_conf_parse.c`:
- Around line 58-70: The code currently checks ec->cloud_user.value and
ec->cloud_passwd.value after calling flb_es_str_copy_str(), but
flb_es_str_copy_str() returns an error on OOM while preserving the old dest
value, so change the logic in the parsing branch that handles items==1 and
items==2 to capture and check the return value of
flb_es_str_copy_str(&ec->cloud_user, entry->value) and
flb_es_str_copy_str(&ec->cloud_passwd, entry->value); if the call returns
non‑zero, call flb_utils_split_free(toks) and return -1 to reliably detect
allocation failure instead of inspecting dest->value.

In `@plugins/out_es/es_conf.c`:
- Around line 508-511: The code currently obtains tmp via
flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE) and calls
config_adjust_buffer_size(flb_utils_size_to_bytes(tmp)), which masks invalid
inputs because flb_utils_size_to_bytes() returns -1 for invalid strings; detect
that case before casting/adjusting: call flb_utils_size_to_bytes(tmp), check for
a negative return (e.g. -1) and treat it as a configuration error (log an error
and return/fail the config load for the upstream/node) instead of proceeding to
config_adjust_buffer_size; update the logic around tmp, flb_utils_size_to_bytes,
and ec->buffer_size so only valid positive byte sizes are passed to
config_adjust_buffer_size and assigned to ec->buffer_size.

In `@plugins/out_es/es.c`:
- Around line 917-923: The code currently treats flb_http_client(...) failure as
permanent by setting ret = FLB_ERROR and dropping the chunk; instead treat
allocation/connect failures as transient and return a retry code so Fluent Bit
will requeue the data. In the block that checks if (!c) (the flb_http_client
call creating variable c with u_conn and ec->uri, using pack/pack_size), change
the error handling to log the failure via flb_plg_error(ctx->ins, ...) and set
ret = FLB_RETRY (not FLB_ERROR) before jumping to cleanup_and_return so the
chunk is retried rather than discarded.

@mabrarov
Copy link
Copy Markdown
Contributor Author

mabrarov commented Feb 9, 2026

Hi @cosmo0920 and @patrick-stephens,

This PR is pending for more than 2.5 years. I appreciate if you could find time and review it again. I believe I have addressed comment of @cosmo0920 which was the only concern at the time the comment was added.

My winter vacation has ended and I won't be able to spend much time on this PR till summer, especially, considering new pending PRs for the same plugin (e.g. #11422 and #11375) - reabasing such changes can be hard.

Thank you.

@mabrarov
Copy link
Copy Markdown
Contributor Author

mabrarov commented Feb 9, 2026

I believe that similar changes can be implemented / helpful for OpenSearch output plugin (in a dedicated PR).

mabrarov added 15 commits April 10, 2026 16:33
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…wn to parser of Upstream node configuration section are implemented, e.g. "host" and "port"

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…o the test callback based on configuration of Fluent Bit and based on configuration of plugin

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…llback contract

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…with Upstream node configuration

For Elastic cloud authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: cloud_id.

For AWS authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: http_proxy, no_proxy, tls*.

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…o the test callback based on configuration of Fluent Bit and based on configuration of plugin

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…icate username or password used for AWS authentication

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…on and retrying buffer flush

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…ent creation failed

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…iguration data

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
…tials

Signed-off-by: Marat Abrarov <abrarov@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs-required ok-package-test Run PR packaging tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants