From af46b723b0eb1ef3a65963b349206dbc91d60e4b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Sat, 21 May 2016 18:55:56 +0900 Subject: [PATCH 1/2] add Fluent::Plugin::BareOutput to support v0.12 MultiOutput plugins --- lib/fluent/compat/output.rb | 11 ++- lib/fluent/plugin/bare_output.rb | 58 +++++++++++++++ lib/fluent/plugin/multi_output.rb | 9 +-- test/plugin/test_bare_output.rb | 118 ++++++++++++++++++++++++++++++ test/plugin/test_multi_output.rb | 36 +-------- 5 files changed, 186 insertions(+), 46 deletions(-) create mode 100644 lib/fluent/plugin/bare_output.rb create mode 100644 test/plugin/test_bare_output.rb diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 6851d5ac84..72d28c2989 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -16,7 +16,7 @@ require 'fluent/plugin' require 'fluent/plugin/output' -require 'fluent/plugin/multi_output' +require 'fluent/plugin/bare_output' require 'fluent/compat/call_super_mixin' require 'fluent/compat/output_chain' require 'fluent/timezone' @@ -149,11 +149,10 @@ def initialize end end - class MultiOutput < Fluent::Plugin::MultiOutput - def initialize - super - @compat = true - end + class MultiOutput < Fluent::Plugin::BareOutput + # TODO: warn when deprecated + + helpers :event_emitter def process(tag, es) emit(tag, es, NULL_OUTPUT_CHAIN) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb new file mode 100644 index 0000000000..56b5d9778b --- /dev/null +++ b/lib/fluent/plugin/bare_output.rb @@ -0,0 +1,58 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/base' + +require 'fluent/log' +require 'fluent/plugin_id' +require 'fluent/plugin_helper' + +module Fluent + module Plugin + class BareOutput < Base + include PluginId + include PluginLoggerMixin + include PluginHelper::Mixin + + attr_reader :num_errors, :emit_count, :emit_records + + def process(tag, es) + raise NotImplementedError, "BUG: output plugins MUST implement this method" + end + + def initialize + super + @counters_monitor = Monitor.new + # TODO: well organized counters + @num_errors = 0 + @emit_count = 0 + @emit_records = 0 + end + + def emit_sync(tag, es) + @counters_monitor.synchronize{ @emit_count += 1 } + begin + process(tag, es) + @counters_monitor.synchronize{ @emit_records += es.size } + rescue + @counters_monitor.synchronize{ @num_errors += 1 } + raise + end + end + alias :emit_events :emit_sync + end + end +end diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 3f0254b536..51a6ef625c 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -34,6 +34,10 @@ class MultiOutput < Base attr_reader :outputs + def process(tag, es) + raise NotImplementedError, "BUG: output plugins MUST implement this method" + end + def initialize super @outputs = [] @@ -52,9 +56,6 @@ def initialize def configure(conf) super - # v0.12 MultiOutput does nothing about initializing stores, and plugin implementation did it. - return if @compat - @stores.each do |store| store_conf = store.corresponding_config_element type = store_conf['@type'] @@ -89,8 +90,6 @@ def emit_sync(tag, es) end end alias :emit_events :emit_sync - - # def process(tag, es) end end end diff --git a/test/plugin/test_bare_output.rb b/test/plugin/test_bare_output.rb new file mode 100644 index 0000000000..3dab2b66c8 --- /dev/null +++ b/test/plugin/test_bare_output.rb @@ -0,0 +1,118 @@ +require_relative '../helper' +require 'fluent/plugin/bare_output' +require 'fluent/event' + +module FluentPluginBareOutputTest + class DummyPlugin < Fluent::Plugin::BareOutput + attr_reader :store + def initialize + super + @store = [] + end + def process(tag, es) + es.each do |time, record| + @store << [tag, time, record] + end + end + end +end + +class BareOutputTest < Test::Unit::TestCase + setup do + Fluent::Test.setup + @p = FluentPluginBareOutputTest::DummyPlugin.new + end + + test 'has healthy lifecycle' do + assert !@p.configured? + @p.configure(config_element()) + assert @p.configured? + + assert !@p.started? + @p.start + assert @p.start + + assert !@p.stopped? + @p.stop + assert @p.stopped? + + assert !@p.before_shutdown? + @p.before_shutdown + assert @p.before_shutdown? + + assert !@p.shutdown? + @p.shutdown + assert @p.shutdown? + + assert !@p.after_shutdown? + @p.after_shutdown + assert @p.after_shutdown? + + assert !@p.closed? + @p.close + assert @p.closed? + + assert !@p.terminated? + @p.terminate + assert @p.terminated? + end + + test 'has plugin_id automatically generated' do + assert @p.respond_to?(:plugin_id_configured?) + assert @p.respond_to?(:plugin_id) + + @p.configure(config_element()) + + assert !@p.plugin_id_configured? + assert @p.plugin_id + assert{ @p.plugin_id != 'mytest' } + end + + test 'has plugin_id manually configured' do + @p.configure(config_element('ROOT', '', {'@id' => 'mytest'})) + assert @p.plugin_id_configured? + assert_equal 'mytest', @p.plugin_id + end + + test 'has plugin logger' do + assert @p.respond_to?(:log) + assert @p.log + + # default logger + original_logger = @p.log + + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + assert{ @p.log.object_id != original_logger.object_id } + assert_equal Fluent::Log::LEVEL_DEBUG, @p.log.level + end + + test 'can load plugin helpers' do + assert_nothing_raised do + class FluentPluginBareOutputTest::DummyPlugin2 < Fluent::Plugin::BareOutput + helpers :storage + end + end + end + + test 'can get input event stream to write' do + @p.configure(config_element('ROOT')) + @p.start + + es1 = Fluent::OneEventStream.new(event_time('2016-05-21 18:37:31 +0900'), {'k1' => 'v1'}) + es2 = Fluent::ArrayEventStream.new([ + [event_time('2016-05-21 18:38:33 +0900'), {'k2' => 'v2'}], + [event_time('2016-05-21 18:39:10 +0900'), {'k3' => 'v3'}], + ]) + @p.emit_events('mytest1', es1) + @p.emit_events('mytest2', es2) + + all_events = [ + ['mytest1', event_time('2016-05-21 18:37:31 +0900'), {'k1' => 'v1'}], + ['mytest2', event_time('2016-05-21 18:38:33 +0900'), {'k2' => 'v2'}], + ['mytest2', event_time('2016-05-21 18:39:10 +0900'), {'k3' => 'v3'}], + ] + + assert_equal all_events, @p.store + end +end diff --git a/test/plugin/test_multi_output.rb b/test/plugin/test_multi_output.rb index 7e2e4ef313..e07d6353b7 100644 --- a/test/plugin/test_multi_output.rb +++ b/test/plugin/test_multi_output.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/plugin/output' +require 'fluent/plugin/multi_output' require 'fluent/event' require 'json' @@ -177,38 +177,4 @@ def create_output(type=:multi) assert_equal 2, @i.events.size end end - - sub_test_case 'compat multi output plugin' do - setup do - Fluent::Test.setup - @i = create_output(:compat_multi) - end - - teardown do - @i.log.out.reset - end - - test '#configure raises error if sections are missing' do - conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' }, []) - assert_raise Fluent::ConfigError do - @i.configure(conf) - end - end - - test '#configure does NOT initialize child plugins' do - assert_equal [], @i.outputs - - conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' }, - [ - config_element('store', '', { '@type' => 'dummy_test_multi_output_1' }), - config_element('store', '', { '@type' => 'dummy_test_multi_output_2' }), - config_element('store', '', { '@type' => 'dummy_test_multi_output_3' }), - config_element('store', '', { '@type' => 'dummy_test_multi_output_4' }), - ] - ) - @i.configure(conf) - - assert_equal [], @i.outputs - end - end end From fd18dbb9f9ca112b0a7961bdcd9da3b9ee57aed0 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 23 May 2016 11:09:02 +0900 Subject: [PATCH 2/2] add comment for abnormality of BareOutput --- lib/fluent/plugin/bare_output.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 56b5d9778b..cef034e9e3 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -23,6 +23,11 @@ module Fluent module Plugin class BareOutput < Base + # DO NOT USE THIS plugin for normal output plugin. Use Output instead. + # This output plugin base class is only for meta-output plugins + # which cannot be implemented on MultiOutput. + # E.g,: forest, config-expander + include PluginId include PluginLoggerMixin include PluginHelper::Mixin