From 6756ce8b2bb48544ab45bee60c1e7112d5e1b4b7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 1 Sep 2016 19:42:20 +0900 Subject: [PATCH 1/6] merge out_buffered_null into out_null --- lib/fluent/plugin/out_buffered_null.rb | 59 -------------------------- lib/fluent/plugin/out_null.rb | 38 +++++++++++++++++ 2 files changed, 38 insertions(+), 59 deletions(-) delete mode 100644 lib/fluent/plugin/out_buffered_null.rb diff --git a/lib/fluent/plugin/out_buffered_null.rb b/lib/fluent/plugin/out_buffered_null.rb deleted file mode 100644 index e605dd1fa8..0000000000 --- a/lib/fluent/plugin/out_buffered_null.rb +++ /dev/null @@ -1,59 +0,0 @@ -# -# Fluentd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -require 'fluent/plugin/output' - -module Fluent::Plugin - class BufferedNullOutput < Output - # This plugin is for tests of buffer plugins - - Fluent::Plugin.register_output('buffered_null', self) - - config_section :buffer do - config_set_default :chunk_keys, ['tag'] - config_set_default :flush_at_shutdown, true - config_set_default :chunk_limit_size, 10 * 1024 - end - - attr_accessor :feed_proc, :delayed - - def initialize - super - @delayed = false - @feed_proc = nil - end - - def prefer_delayed_commit - @delayed - end - - def write(chunk) - if @feed_proc - @feed_proc.call(chunk) - else - # ignore chunk.read - end - end - - def try_write(chunk) - if @feed_proc - @feed_proc.call(chunk) - else - # ignore chunk.read - end - end - end -end diff --git a/lib/fluent/plugin/out_null.rb b/lib/fluent/plugin/out_null.rb index e95bacf366..2ccee4936d 100644 --- a/lib/fluent/plugin/out_null.rb +++ b/lib/fluent/plugin/out_null.rb @@ -18,10 +18,48 @@ module Fluent::Plugin class NullOutput < Output + # This plugin is for tests of non-buffered/buffered plugins Fluent::Plugin.register_output('null', self) + Fluent::Plugin.register_output('buffered_null', self) + + config_section :buffer do + config_set_default :chunk_keys, ['tag'] + config_set_default :flush_at_shutdown, true + config_set_default :chunk_limit_size, 10 * 1024 + end + + def prefer_buffered_processing + false + end + + def prefer_delayed_commit + @delayed + end + + attr_accessor :feed_proc, :delayed + + def initialize + super + @delayed = false + @feed_proc = nil + end def process(tag, es) # Do nothing end + + def write(chunk) + if @feed_proc + @feed_proc.call(chunk) + end + end + + def try_write(chunk) + if @feed_proc + @feed_proc.call(chunk) + end + # not to commit chunks for testing + # commit_write(chunk.unique_id) + end end end From 86b42924ea9605b62880949c7504455f36031f42 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 1 Sep 2016 19:42:41 +0900 Subject: [PATCH 2/6] merge out_buffered_stdout into out_stdout --- lib/fluent/plugin/out_buffered_stdout.rb | 70 ------------------------ lib/fluent/plugin/out_stdout.rb | 41 +++++++++++++- 2 files changed, 39 insertions(+), 72 deletions(-) delete mode 100644 lib/fluent/plugin/out_buffered_stdout.rb diff --git a/lib/fluent/plugin/out_buffered_stdout.rb b/lib/fluent/plugin/out_buffered_stdout.rb deleted file mode 100644 index 1630b15024..0000000000 --- a/lib/fluent/plugin/out_buffered_stdout.rb +++ /dev/null @@ -1,70 +0,0 @@ -# -# Fluentd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -require 'fluent/plugin/output' - -module Fluent::Plugin - class BufferedStdoutOutput < Output - Fluent::Plugin.register_output('buffered_stdout', self) - - helpers :formatter, :inject, :compat_parameters - - config_section :buffer do - config_set_default :chunk_keys, ['tag'] - config_set_default :flush_at_shutdown, true - config_set_default :chunk_limit_size, 10 * 1024 - end - - DEFAULT_FORMAT_TYPE = 'json' - - config_section :format do - config_set_default :@type, DEFAULT_FORMAT_TYPE - end - - attr_accessor :delayed - - def initialize - super - @delayed = false - end - - def prefer_delayed_commit - @delayed - end - - def configure(conf) - if conf['output_type'] && !conf['format'] - conf['format'] = conf['output_type'] - end - compat_parameters_convert(conf, :inject, :formatter) - super - @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) - end - - def write(chunk) - chunk.write_to($log) - end - - def try_write(chunk) - chunk.write_to($log) - end - - def format(tag, time, record) - record = inject_values_to_record(tag, time, record) - "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" - end - end -end diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb index 8d187fd83e..03defd5552 100644 --- a/lib/fluent/plugin/out_stdout.rb +++ b/lib/fluent/plugin/out_stdout.rb @@ -19,30 +19,67 @@ module Fluent::Plugin class StdoutOutput < Output Fluent::Plugin.register_output('stdout', self) + Fluent::Plugin.register_output('buffered_stdout', self) helpers :inject, :formatter, :compat_parameters DEFAULT_FORMAT_TYPE = 'json' + config_section :buffer do + config_set_default :chunk_keys, ['tag'] + config_set_default :flush_at_shutdown, true + config_set_default :chunk_limit_size, 10 * 1024 + end + config_section :format do config_set_default :@type, DEFAULT_FORMAT_TYPE end + def prefer_buffered_processing + false + end + + def prefer_delayed_commit + @delayed + end + + attr_accessor :delayed + + def initialize + super + @delayed = false + end + def configure(conf) if conf['output_type'] && !conf['format'] conf['format'] = conf['output_type'] end compat_parameters_convert(conf, :inject, :formatter) + super + @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) end def process(tag, es) es.each {|time,record| - r = inject_values_to_record(tag, time, record) - $log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, r).chomp}\n" + $log.write(format(tag, time, record)) } $log.flush end + + def format(tag, time, record) + record = inject_values_to_record(tag, time, record) + "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" + end + + def write(chunk) + chunk.write_to($log) + end + + def try_write(chunk) + chunk.write_to($log) + commit_write(chunk.unique_id) + end end end From 3dd4a70ad2457e7545a2664e4d16ee23104dc0f1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 1 Sep 2016 19:43:11 +0900 Subject: [PATCH 3/6] fix tests for merging buffered outputs into outputs --- test/plugin/test_in_monitor_agent.rb | 4 + test/plugin/test_out_buffered_null.rb | 79 ---------- test/plugin/test_out_buffered_stdout.rb | 122 --------------- test/plugin/test_out_null.rb | 94 ++++++++++-- test/plugin/test_out_stdout.rb | 188 ++++++++++++++++++------ 5 files changed, 232 insertions(+), 255 deletions(-) delete mode 100644 test/plugin/test_out_buffered_null.rb delete mode 100644 test/plugin/test_out_buffered_stdout.rb diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index f201d92f89..232938438f 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -119,6 +119,8 @@ def test_configure "@id"=>"null", "@type" => "null" }, + "buffer_queue_length" => 0, + "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", @@ -285,6 +287,8 @@ def get(uri, header = {}) "@id" => "null", "@type" => "null" }, + "buffer_queue_length" => 0, + "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "null", diff --git a/test/plugin/test_out_buffered_null.rb b/test/plugin/test_out_buffered_null.rb deleted file mode 100644 index 269074b44f..0000000000 --- a/test/plugin/test_out_buffered_null.rb +++ /dev/null @@ -1,79 +0,0 @@ -require_relative '../helper' -require 'fluent/test/driver/output' -require 'fluent/plugin/out_buffered_null' - -class BufferedNullOutputTestCase < Test::Unit::TestCase - sub_test_case 'BufferedNullOutput' do - test 'default chunk limit size is 100' do - d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') - assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size - assert d.instance.buffer_config.flush_at_shutdown - assert_equal ['tag'], d.instance.buffer_config.chunk_keys - assert d.instance.chunk_key_tag - assert !d.instance.chunk_key_time - assert_equal [], d.instance.chunk_keys - end - - test 'writes standard formattted chunks' do - d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') - t = event_time("2016-05-23 00:22:13 -0800") - d.run(default_tag: 'test', flush: true) do - d.feed(t, {"message" => "null null null"}) - d.feed(t, {"message" => "null null"}) - d.feed(t, {"message" => "null"}) - end - - assert_equal 3, d.instance.emit_count - assert_equal 3, d.instance.emit_records - end - - test 'check for chunk passed to #write' do - d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') - data = [] - d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } - - t = event_time("2016-05-23 00:22:13 -0800") - d.run(default_tag: 'test', flush: true) do - d.feed(t, {"message" => "null null null"}) - d.feed(t, {"message" => "null null"}) - d.feed(t, {"message" => "null"}) - end - - assert_equal 1, data.size - _, tag, binary = data.first - events = [] - Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } - assert_equal 'test', tag - assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events - end - - test 'check for chunk passed to #try_write' do - d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') - data = [] - d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } - d.instance.delayed = true - - t = event_time("2016-05-23 00:22:13 -0800") - d.run(default_tag: 'test', flush: true, shutdown: false) do - d.feed(t, {"message" => "null null null"}) - d.feed(t, {"message" => "null null"}) - d.feed(t, {"message" => "null"}) - end - - assert_equal 1, data.size - chunk_id, tag, binary = data.first - events = [] - Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } - assert_equal 'test', tag - assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events - - assert_equal [chunk_id], d.instance.buffer.dequeued.keys - - d.instance.commit_write(chunk_id) - - assert_equal [], d.instance.buffer.dequeued.keys - - d.instance_shutdown - end - end -end diff --git a/test/plugin/test_out_buffered_stdout.rb b/test/plugin/test_out_buffered_stdout.rb deleted file mode 100644 index 6f36a4643e..0000000000 --- a/test/plugin/test_out_buffered_stdout.rb +++ /dev/null @@ -1,122 +0,0 @@ -require_relative '../helper' -require 'fluent/test/driver/output' -require 'fluent/plugin/out_buffered_stdout' - -class BufferedStdoutOutputTest < Test::Unit::TestCase - def setup - Fluent::Test.setup - end - - CONFIG = %[ - ] - - def create_driver(conf = CONFIG) - Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedStdoutOutput).configure(conf) - end - - test 'default configure' do - d = create_driver - assert_equal [], d.instance.formatter_configs - assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size - assert d.instance.buffer_config.flush_at_shutdown - assert_equal ['tag'], d.instance.buffer_config.chunk_keys - assert d.instance.chunk_key_tag - assert !d.instance.chunk_key_time - assert_equal [], d.instance.chunk_keys - end - - test 'configure with output_type' do - d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.formatter_configs.first[:@type] - - d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.formatter_configs.first[:@type] - - assert_raise(Fluent::ConfigError) do - d = create_driver(CONFIG + "\noutput_type foo") - end - end - - sub_test_case "emit with default config" do - test '#write(synchronous)' do - d = create_driver - time = event_time() - - out = capture_log do - d.run(default_tag: 'test', flush: true) do - d.feed(time, {'test' => 'test'}) - end - end - assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out - end - end - - sub_test_case "emit json" do - data('oj' => 'oj', 'yajl' => 'yajl') - test '#write(synchronous)' do |data| - d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") - time = event_time() - - out = capture_log do - d.run(default_tag: 'test', flush: true) do - d.feed(time, {'test' => 'test'}) - end - end - assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out - end - - data('oj' => 'oj', 'yajl' => 'yajl') - test '#try_write(asynchronous)' do |data| - d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") - time = event_time() - d.instance.delayed = true - - out = capture_log do - d.run(default_tag: 'test', flush: true, shutdown: false) do - d.feed(time, {'test' => 'test'}) - end - end - - assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out - end - end - - sub_test_case 'emit hash' do - test '#write(synchronous)' do - d = create_driver(CONFIG + "\noutput_type hash") - time = event_time() - - out = capture_log do - d.run(default_tag: 'test', flush: true) do - d.feed(time, {'test' => 'test'}) - end - end - - assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out - end - - test '#try_write(asynchronous)' do - d = create_driver(CONFIG + "\noutput_type hash") - time = event_time() - d.instance.delayed = true - - out = capture_log do - d.run(default_tag: 'test', flush: true, shutdown: false) do - d.feed(time, {'test' => 'test'}) - end - end - - assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out - end - end - - # Capture the log output of the block given - def capture_log(&block) - tmp = $log - $log = StringIO.new - yield - return $log.string - ensure - $log = tmp - end -end diff --git a/test/plugin/test_out_null.rb b/test/plugin/test_out_null.rb index 87c8119621..dd1edd6ec5 100644 --- a/test/plugin/test_out_null.rb +++ b/test/plugin/test_out_null.rb @@ -11,19 +11,95 @@ def create_driver(conf = "") Fluent::Test::Driver::Output.new(Fluent::Plugin::NullOutput).configure(conf) end - def test_configure - assert_nothing_raised do - create_driver + sub_test_case 'non-buffered' do + test 'configure' do + assert_nothing_raised do + create_driver + end + end + + test 'process' do + d = create_driver + assert_nothing_raised do + d.run do + d.feed("test", Fluent::EventTime.now, {"test" => "null"}) + end + end + assert_equal([], d.events(tag: "test")) end end - def test_process - d = create_driver - assert_nothing_raised do - d.run do - d.feed("test", Fluent::EventTime.now, {"test" => "null"}) + sub_test_case 'buffered' do + test 'default chunk limit size is 100' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size + assert d.instance.buffer_config.flush_at_shutdown + assert_equal ['tag'], d.instance.buffer_config.chunk_keys + assert d.instance.chunk_key_tag + assert !d.instance.chunk_key_time + assert_equal [], d.instance.chunk_keys + end + + test 'writes standard formattted chunks' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) end + + assert_equal 3, d.instance.emit_count + assert_equal 3, d.instance.emit_records + end + + test 'check for chunk passed to #write' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + data = [] + d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } + + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) + end + + assert_equal 1, data.size + _, tag, binary = data.first + events = [] + Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } + assert_equal 'test', tag + assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events + end + + test 'check for chunk passed to #try_write' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + data = [] + d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } + d.instance.delayed = true + + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true, shutdown: false) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) + end + + assert_equal 1, data.size + chunk_id, tag, binary = data.first + events = [] + Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } + assert_equal 'test', tag + assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events + + assert_equal [chunk_id], d.instance.buffer.dequeued.keys + + d.instance.commit_write(chunk_id) + + assert_equal [], d.instance.buffer.dequeued.keys + + d.instance_shutdown end - assert_equal([], d.events(tag: "test")) end end diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index 4b4c20e4d5..8fd02cb06e 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -14,70 +14,168 @@ def create_driver(conf = CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::StdoutOutput).configure(conf) end - def test_configure - d = create_driver - assert_equal [], d.instance.formatter_configs - end + sub_test_case 'non-buffered' do + test 'configure' do + d = create_driver + assert_equal [], d.instance.formatter_configs + end - def test_configure_output_type - d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.formatter_configs.first[:@type] + test 'configure output_type' do + d = create_driver(CONFIG + "\noutput_type json") + assert_equal 'json', d.instance.formatter_configs.first[:@type] - d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.formatter_configs.first[:@type] + d = create_driver(CONFIG + "\noutput_type hash") + assert_equal 'hash', d.instance.formatter_configs.first[:@type] - assert_raise(Fluent::ConfigError) do - d = create_driver(CONFIG + "\noutput_type foo") + assert_raise(Fluent::ConfigError) do + d = create_driver(CONFIG + "\noutput_type foo") + end end - end - def test_emit_in_default - d = create_driver - time = event_time() - out = capture_log do - d.run(default_tag: 'test') do - d.feed(time, {'test' => 'test1'}) + test 'emit with default configuration' do + d = create_driver + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test1'}) + end end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out end - assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out - end - data('oj' => 'oj', 'yajl' => 'yajl') - def test_emit_json(data) - d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") - time = event_time() - out = capture_log do - d.run(default_tag: 'test') do - d.feed(time, {'test' => 'test1'}) + data('oj' => 'oj', 'yajl' => 'yajl') + test 'emit in json format' do |data| + d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test1'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out + + if data == 'yajl' + # NOTE: Float::NAN is not jsonable + assert_raise(Yajl::EncodeError) { d.feed('test', time, {'test' => Float::NAN}) } + else + out = capture_log { d.feed('test', time, {'test' => Float::NAN}) } + assert_equal "#{Time.at(time).localtime} test: {\"test\":NaN}\n", out end end - assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out - if data == 'yajl' - # NOTE: Float::NAN is not jsonable - assert_raise(Yajl::EncodeError) { d.feed('test', time, {'test' => Float::NAN}) } - else + test 'emit in hash format' do + d = create_driver(CONFIG + "\noutput_type hash") + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test2'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test2\"}\n", out + + # NOTE: Float::NAN is not jsonable, but hash string can output it. out = capture_log { d.feed('test', time, {'test' => Float::NAN}) } - assert_equal "#{Time.at(time).localtime} test: {\"test\":NaN}\n", out + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>NaN}\n", out end end - def test_emit_hash - d = create_driver(CONFIG + "\noutput_type hash") - time = event_time() - out = capture_log do - d.run(default_tag: 'test') do - d.feed(time, {'test' => 'test2'}) + sub_test_case 'buffered' do + test 'configure' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + assert_equal [], d.instance.formatter_configs + assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size + assert d.instance.buffer_config.flush_at_shutdown + assert_equal ['tag'], d.instance.buffer_config.chunk_keys + assert d.instance.chunk_key_tag + assert !d.instance.chunk_key_time + assert_equal [], d.instance.chunk_keys + end + + test 'configure with output_type' do + d = create_driver(config_element("ROOT", "", {"output_type" => "json"}, [config_element("buffer")])) + assert_equal 'json', d.instance.formatter_configs.first[:@type] + + d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")])) + assert_equal 'hash', d.instance.formatter_configs.first[:@type] + + assert_raise(Fluent::ConfigError) do + create_driver(config_element("ROOT", "", {"output_type" => "foo"}, [config_element("buffer")])) end end - assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test2\"}\n", out - # NOTE: Float::NAN is not jsonable, but hash string can output it. - out = capture_log { d.feed('test', time, {'test' => Float::NAN}) } - assert_equal "#{Time.at(time).localtime} test: {\"test\"=>NaN}\n", out - end + sub_test_case "emit with default config" do + test '#write(synchronous)' do + d = create_driver(config_element("ROOT", "", {}, [config_element("buffer")])) + time = event_time() + + out = capture_log do + d.run(default_tag: 'test', flush: true) do + d.feed(time, {'test' => 'test'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out + end + end - private + sub_test_case "emit json" do + data('oj' => 'oj', 'yajl' => 'yajl') + test '#write(synchronous)' do |data| + d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")])) + time = event_time() + + out = capture_log do + d.run(default_tag: 'test', flush: true) do + d.feed(time, {'test' => 'test'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out + end + + data('oj' => 'oj', 'yajl' => 'yajl') + test '#try_write(asynchronous)' do |data| + d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")])) + time = event_time() + d.instance.delayed = true + + out = capture_log do + d.run(default_tag: 'test', flush: true, shutdown: false) do + d.feed(time, {'test' => 'test'}) + end + end + + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out + end + end + + sub_test_case 'emit hash' do + test '#write(synchronous)' do + d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")])) + time = event_time() + + out = capture_log do + d.run(default_tag: 'test', flush: true) do + d.feed(time, {'test' => 'test'}) + end + end + + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out + end + + test '#try_write(asynchronous)' do + d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")])) + time = event_time() + d.instance.delayed = true + + out = capture_log do + d.run(default_tag: 'test', flush: true, shutdown: false) do + d.feed(time, {'test' => 'test'}) + end + end + + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out + end + end + end # Capture the log output of the block given def capture_log(&block) From fe7a4d50baf08e7c91a762c265fd58cebff5fed6 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 1 Sep 2016 19:43:25 +0900 Subject: [PATCH 4/6] fix example configurations for merged outputs --- example/in_forward.conf | 3 +++ example/out_buffered_null.conf | 16 ++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/example/in_forward.conf b/example/in_forward.conf index ff7598075f..a639738f14 100644 --- a/example/in_forward.conf +++ b/example/in_forward.conf @@ -8,4 +8,7 @@ @type stdout + # + # flush_interval 10s + # diff --git a/example/out_buffered_null.conf b/example/out_buffered_null.conf index a6f2e928e0..0c13dd4769 100644 --- a/example/out_buffered_null.conf +++ b/example/out_buffered_null.conf @@ -13,16 +13,20 @@ - @type buffered_null - try_flush_interval 60 - flush_interval 60 - buffer_chunk_limit 1k - buffer_queue_limit 2 + @type null + + flush_interval 60s + chunk_limit_size 1k + total_limit_size 4k + From 73686a9c8378f0d409cfeec6ac0402f86af8686b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 2 Sep 2016 09:37:56 +0900 Subject: [PATCH 5/6] rename to plugin name --- example/{out_buffered_null.conf => out_null.conf} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename example/{out_buffered_null.conf => out_null.conf} (100%) diff --git a/example/out_buffered_null.conf b/example/out_null.conf similarity index 100% rename from example/out_buffered_null.conf rename to example/out_null.conf From 11199442ccff993fcb9ca034206011823f84c37f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 2 Sep 2016 09:40:07 +0900 Subject: [PATCH 6/6] remove old "buffered_" prefixed names --- lib/fluent/plugin/out_null.rb | 1 - lib/fluent/plugin/out_stdout.rb | 1 - 2 files changed, 2 deletions(-) diff --git a/lib/fluent/plugin/out_null.rb b/lib/fluent/plugin/out_null.rb index 2ccee4936d..5d9a27aec0 100644 --- a/lib/fluent/plugin/out_null.rb +++ b/lib/fluent/plugin/out_null.rb @@ -20,7 +20,6 @@ module Fluent::Plugin class NullOutput < Output # This plugin is for tests of non-buffered/buffered plugins Fluent::Plugin.register_output('null', self) - Fluent::Plugin.register_output('buffered_null', self) config_section :buffer do config_set_default :chunk_keys, ['tag'] diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb index 03defd5552..9604772346 100644 --- a/lib/fluent/plugin/out_stdout.rb +++ b/lib/fluent/plugin/out_stdout.rb @@ -19,7 +19,6 @@ module Fluent::Plugin class StdoutOutput < Output Fluent::Plugin.register_output('stdout', self) - Fluent::Plugin.register_output('buffered_stdout', self) helpers :inject, :formatter, :compat_parameters