Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def configure(conf)
@data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?

# ref. https://opensearch.org/docs/latest/opensearch/data-streams/
unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
unless placeholder_substitution_needed?
validate_data_stream_parameters
else
@use_placeholder = true
Expand Down Expand Up @@ -67,12 +67,20 @@ def validate_data_stream_parameters
end
end

def create_index_template(datastream_name, template_name, host = nil)
def placeholder_substitution_needed?
need_substitution = placeholder?(:data_stream_name_placeholder, @data_stream_name) ||
@customize_template&.values&.any? { |value| placeholder?(:customize_template, value.to_s) } ||
placeholder?(:data_stream_template_name, @data_stream_template_name)
log.debug("Needs substitution: #{need_substitution}")
need_substitution
end

def create_index_template(datastream_name, template_name, customize_template = nil, host = nil)
# Create index template from file
if !dry_run?
if @template_file
return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
template_installation_actual(template_name, @customize_template, @application_name, datastream_name, host)
template_installation_actual(template_name, customize_template, @application_name, datastream_name, host)
else # Create default index template
return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
body = {
Expand Down Expand Up @@ -162,8 +170,11 @@ def write(chunk)
end
data_stream_name = extract_placeholders(@data_stream_name, chunk).downcase
data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk).downcase
if @customize_template
customize_template = @customize_template.each_with_object({}) { |(key, value), hash| hash[key] = extract_placeholders(value, chunk) }
end
begin
create_index_template(data_stream_name, data_stream_template_name, host)
create_index_template(data_stream_name, data_stream_template_name, customize_template, host)
rescue => e
raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
end
Expand Down
30 changes: 30 additions & 0 deletions test/plugin/test_out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -743,4 +743,34 @@ def test_record_with_remove_keys
assert(!index_cmds[1].has_key?('remove_me'))
end

def test_custom_data_stream_template_create_with_placeholders
cwd = File.dirname(__FILE__)
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_name_placeholder' => 'foo',
'data_stream_template_name' => '${tag}_template',
'template_file' => File.join(cwd, 'datastream_template.json'),
'customize_template' => '{"foo*": "${tag}--*"}',
})

stub_default
stub_bulk_feed('foo', 'test_templata')
stub_nonexistent_template?('test_template')

stub_request(:put, "http://localhost:9200/_index_template/test_template")
.to_return(:status => 200, :body => "", :headers => {})

driver(conf).run(default_tag: 'test') do
driver.feed(sample_record)
end

assert_requested(
:put,
"http://localhost:9200/_index_template/test_template",
body: {"index_patterns" => ["test--*"], "data_stream" => {}}
)

end
end
Loading