diff --git a/README.md b/README.md index 01926eb..1c8e57d 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,11 @@ message: `metrics { "f1":"100", "f2":"200", "f3":"300" }` - Setting for rewriting the tag. - For more information: https://github.com/y-ken/fluent-mixin-rewrite-tag-name +##### buffer + +- Inherits from buffer parameters. + - Please refer to the article: http://docs.fluentd.org/v0.12/articles/buffer-plugin-overview + ## Contributing 1. Fork it ( http://github.com/studio3104/fluent-plugin-graphite/fork ) diff --git a/lib/fluent/plugin/out_graphite.rb b/lib/fluent/plugin/out_graphite.rb index c72b84c..7d30d77 100644 --- a/lib/fluent/plugin/out_graphite.rb +++ b/lib/fluent/plugin/out_graphite.rb @@ -1,6 +1,6 @@ require 'fluent/mixin/rewrite_tag_name' -class Fluent::GraphiteOutput < Fluent::Output +class Fluent::GraphiteOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('graphite', self) include Fluent::HandleTagNameMixin @@ -49,10 +49,19 @@ def configure(conf) end # How many times to retry the call if timeout raised @max_retries ||= 3 + + if @flush_interval < 10 + log.info("flush_interval less than 10s is not allowed and overwritten to 10s") + @flush_interval = 10 + end end - def emit(tag, es, chain) - es.each do |time, record| + def format(tag, time, record) + [tag, time, record].to_msgpack + end + + def write(chunk) + chunk.msgpack_each do |tag, time, record| emit_tag = tag.dup filter_record(emit_tag, time, record) next unless metrics = format_metrics(emit_tag, record) @@ -60,8 +69,6 @@ def emit(tag, es, chain) # implemented to immediate call post method in this loop, because graphite-api.gem has the buffers. post(metrics, time) end - - chain.next end def format_metrics(tag, record) diff --git a/test/plugin/test_out_graphite.rb b/test/plugin/test_out_graphite.rb index 09b7809..ac64af4 100644 --- a/test/plugin/test_out_graphite.rb +++ b/test/plugin/test_out_graphite.rb @@ -36,6 +36,12 @@ class GraphiteOutputTest < Test::Unit::TestCase name_keys dstat.total cpu usage.usr,dstat.total cpu usage.sys,dstat.total cpu usage.idl name_key_pattern ^((?!hostname).)*$ ] + CONFIG_NAME_KEY_PATTERN_FLUSH_INTERVAL_LESS_THAN_10_SECONDS = %[ + host localhost + port #{TCP_PORT} + name_key_pattern ^((?!hostname).)*$ + flush_interval 5s + ] def setup Fluent::Test.setup @@ -57,6 +63,7 @@ def test_configure assert_equal d.instance.tag_for, 'prefix' assert_equal d.instance.name_keys, nil assert_equal d.instance.name_key_pattern, /^((?!hostname).)*$/ + assert_equal d.instance.flush_interval, 60 d = create_driver(CONFIG_NAME_KEYS) assert_equal d.instance.host, 'localhost' @@ -83,6 +90,12 @@ def test_configure assert_raise(Fluent::ConfigError) { d = create_driver(CONFIG_SPECIFY_BOTH_NAME_KEYS_AND_NAME_KEY_PATTERN) } end + def test_configure_flush_interval_less_than_10s + d = create_driver(CONFIG_NAME_KEY_PATTERN_FLUSH_INTERVAL_LESS_THAN_10_SECONDS) + # should be overwritten with 10s which is minimum graphite metrics resolution + assert_equal d.instance.flush_interval, 10 + end + def test_format_metrics_strings record = { 'hostname' => 'localhost.localdomain',