From 1a99a98329e67af1790781e1a8ed61b85f73f842 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 1 Nov 2016 19:49:24 +0900 Subject: [PATCH 01/41] fix to raise error for nonsense #commit_write call --- lib/fluent/plugin/output.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 504f113d95..2b0b2fdead 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -889,9 +889,10 @@ def handle_stream_simple(tag, es, enqueue: false) def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed if delayed - @dequeued_chunks_mutex.synchronize do - @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } + r = @dequeued_chunks_mutex.synchronize do + @dequeued_chunks.reject!{ |info| info.chunk_id == chunk_id } end + raise "BUG: chunk is not committed, does plugin call #commit_write in #try_write ?" if r.nil? # nothing deleted from @dequeued_chunks end @buffer.purge_chunk(chunk_id) From 323f7e1f15dd068d42d7d35b38fcb100a5471e5b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:25:54 +0900 Subject: [PATCH 02/41] add extract support and fix time handling --- lib/fluent/plugin_helper/compat_parameters.rb | 59 ++++++++++++ test/plugin_helper/test_compat_parameters.rb | 89 ++++++++++++++++++- 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index c5b2f3a3c6..107cf69dbd 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -57,6 +57,8 @@ module CompatParameters "keys" => "keys", # CSVParser, TSVParser (old ValuesParser) "time_key" => "time_key", "time_format" => "time_format", + "localtim" => nil, + "utc" => nil, "delimiter" => "delimiter", "json_parser" => "json_parser", # JSONParser "label_delimiter" => "label_delimiter", # LabeledTSVParser @@ -77,10 +79,20 @@ module CompatParameters "utc" => nil, } + EXTRACT_PARAMS = { + "time_key" => "time_key", + "time_format" => "time_format", + "timezone" => "timezone", + "tag_key" => "tag_key", + "localtime" => nil, + "utc" => nil, + } + FORMATTER_PARAMS = { "format" => "@type", "delimiter" => "delimiter", "force_quotes" => "force_quotes", # CsvFormatter + "keys" => "keys", # TSVFormatter "fields" => "fields", # CsvFormatter "json_parser" => "json_parser", # JSONFormatter "label_delimiter" => "label_delimiter", # LabeledTSVFormatter @@ -101,6 +113,8 @@ def compat_parameters_convert(conf, *types, **kwargs) compat_parameters_buffer(conf, **kwargs) when :inject compat_parameters_inject(conf) + when :extract + compat_parameters_extract(conf) when :parser compat_parameters_parser(conf) when :formatter @@ -173,6 +187,7 @@ def compat_parameters_inject(conf) hash['time_type'] ||= 'string' end if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + hash['time_key'] ||= 'time' hash['time_type'] = 'unixtime' end if conf.has_key?('localtime') || conf.has_key?('utc') @@ -199,6 +214,37 @@ def compat_parameters_inject(conf) conf end + def compat_parameters_extract(conf) + return unless conf.elements('extract').empty? + return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } + + # TODO: warn obsolete parameters if these are deprecated + hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS) + + if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + hash['time_key'] ||= 'time' + hash['time_type'] = 'unixtime' + end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + hash['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + hash['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way + end + end + + e = Fluent::Config::Element.new('extract', '', hash, []) + conf.elements << e + + conf + end + def compat_parameters_parser(conf) return unless conf.elements('parse').empty? return if PARSER_PARAMS.keys.all?{|k| !conf.has_key?(k) } @@ -216,6 +262,19 @@ def compat_parameters_parser(conf) end hash["types"] = JSON.dump(types) end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + hash['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + hash['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way + end + end e = Fluent::Config::Element.new('parse', '', hash, []) conf.elements << e diff --git a/test/plugin_helper/test_compat_parameters.rb b/test/plugin_helper/test_compat_parameters.rb index 6e353d2dc5..54830adba7 100644 --- a/test/plugin_helper/test_compat_parameters.rb +++ b/test/plugin_helper/test_compat_parameters.rb @@ -1,6 +1,8 @@ require_relative '../helper' require 'fluent/plugin_helper/compat_parameters' -require 'fluent/plugin/base' +require 'fluent/plugin/input' +require 'fluent/plugin/output' +require 'fluent/time' require 'time' @@ -21,6 +23,26 @@ class CompatParameterTest < Test::Unit::TestCase end end + class DummyI0 < Fluent::Plugin::Input + helpers :compat_parameters, :parser, :extract + attr_reader :parser + def configure(conf) + compat_parameters_convert(conf, :extract, :parser) + super + end + def start + super + @parser = parser_create + end + def produce_events(input_data) + emit_events = [] # tag, time, record + @parser.parse(input_data) do |time, record| + tag = extract_tag_from_record(record) || 'dummy_tag' + emit_events << [tag, time, record] + end + emit_events + end + end class DummyO0 < Fluent::Plugin::Output helpers :compat_parameters def configure(conf) @@ -239,4 +261,69 @@ def write(chunk); end # dummy assert_equal "value%1,tag%tag.test,time%#{iso8601str}\n", formatted end end + + sub_test_case 'input plugins' do + test 'plugin helper converts parameters into plugin configuration parameters for extract and parser' do + hash = { + 'format' => 'ltsv', + 'delimiter' => ',', + 'label_delimiter' => '%', + 'tag_key' => 't2', + 'time_key' => 't', + 'time_format' => '%Y-%m-%d.%H:%M:%S.%N', + 'utc' => 'yes', + 'types' => 'A integer|B string|C bool', + 'types_delimiter' => '|', + 'types_label_delimiter' => ' ', + } + conf = config_element('ROOT', '', hash) + @i = DummyI0.new + @i.configure(conf) + @i.start + @i.after_start + + parser = @i.parser + assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser } + assert_equal ',', parser.delimiter + assert_equal '%', parser.label_delimiter + + events = @i.produce_events("A%1,B%x,C%true,t2%mytag,t%2016-10-20.03:50:11.987654321") + assert_equal 1, events.size + + tag, time, record = events.first + assert_equal 'mytag', tag + assert_equal_event_time event_time("2016-10-20 03:50:11.987654321 +0000"), time + assert_equal 3, record.keys.size + assert_equal ['A','B','C'], record.keys.sort + assert_equal 1, record['A'] + assert_equal 'x', record['B'] + assert_equal true, record['C'] + end + + test 'plugin helper converts parameters into plugin configuration parameters for extract and parser, using numeric time' do + hash = { + 'format' => 'ltsv', + 'delimiter' => ',', + 'label_delimiter' => '%', + 'tag_key' => 't2', + 'time_key' => 't', + 'time_type' => 'float', + 'localtime' => 'yes', + } + conf = config_element('ROOT', '', hash) + @i = DummyI0.new + @i.configure(conf) + @i.start + @i.after_start + + parser = @i.parser + assert{ parser.is_a? Fluent::Plugin::LabeledTSVParser } + assert_equal ',', parser.delimiter + assert_equal '%', parser.label_delimiter + end + + test 'plugin helper setups time extraction as unix time (integer from epoch)' do + # TODO: + end + end end From c7a197c5ac159d8647884d2dd7cbefbcdfeb7cb4 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:28:03 +0900 Subject: [PATCH 03/41] fix to wait child processes to run actually, and fix timeout feature to show situation clearly --- lib/fluent/plugin_helper/child_process.rb | 3 ++- lib/fluent/test/driver/base.rb | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin_helper/child_process.rb b/lib/fluent/plugin_helper/child_process.rb index f36b68e947..eede92bd27 100644 --- a/lib/fluent/plugin_helper/child_process.rb +++ b/lib/fluent/plugin_helper/child_process.rb @@ -265,6 +265,7 @@ def child_process_execute_once( m.lock # run after plugin thread get pid, thread instance and i/o m.unlock begin + @_child_process_processes[pid].alive = true block.call(*io_objects) rescue EOFError => e log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments @@ -286,7 +287,7 @@ def child_process_execute_once( end thread[:_fluentd_plugin_helper_child_process_running] = true thread[:_fluentd_plugin_helper_child_process_pid] = pid - pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil) + pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, false, nil) @_child_process_mutex.synchronize do @_child_process_processes[pid] = pinfo end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 6f63203531..f72639991f 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -149,11 +149,15 @@ def instance_shutdown def run_actual(timeout: DEFAULT_TIMEOUT, &block) if @instance.respond_to?(:_threads) - sleep 0.01 until @instance._threads.values.all?(&:alive?) + sleep 0.1 until @instance._threads.values.all?(&:alive?) end if @instance.respond_to?(:event_loop_running?) - sleep 0.01 until @instance.event_loop_running? + sleep 0.1 until @instance.event_loop_running? + end + + if @instance.respond_to?(:_child_process_processes) + sleep 0.1 until @instance._child_process_processes.values.all?{|pinfo| pinfo.alive } end return_value = nil From ac5c6f93a83e9835947734828af4f66dd4b8da53 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:28:32 +0900 Subject: [PATCH 04/41] show plugin helper name explicitly --- lib/fluent/plugin_helper.rb | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index fda30bf6e3..8977f64913 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -36,7 +36,14 @@ def self.included(mod) end def helpers(*snake_case_symbols) - helper_modules = snake_case_symbols.map{|name| Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) } + helper_modules = [] + snake_case_symbols.each do |name| + begin + helper_modules << Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) + rescue NameError + raise "Unknown plugin helper:#{name}" + end + end include(*helper_modules) end end From d9857789405a2d9d6912f61e9e5120482a076067 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:34:54 +0900 Subject: [PATCH 05/41] add keep_tag_key/keep_time_key support and warning for compatibility w/ v0.12 --- lib/fluent/plugin_helper/extract.rb | 20 ++++++++++++++++---- lib/fluent/plugin_helper/inject.rb | 4 ++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin_helper/extract.rb b/lib/fluent/plugin_helper/extract.rb index 2fa08568ad..79144c8de1 100644 --- a/lib/fluent/plugin_helper/extract.rb +++ b/lib/fluent/plugin_helper/extract.rb @@ -25,7 +25,8 @@ def extract_tag_from_record(record) return nil unless @_extract_enabled if @_extract_tag_key && record.has_key?(@_extract_tag_key) - return record[@_extract_tag_key].to_s + v = @_extract_keep_tag_key ? record[@_extract_tag_key] : record.delete(@_extract_tag_key) + return v.to_s end nil @@ -35,7 +36,8 @@ def extract_time_from_record(record) return nil unless @_extract_enabled if @_extract_time_key && record.has_key?(@_extract_time_key) - return @_extract_time_parser.call(record[@_extract_time_key]) + v = @_extract_keep_time_key ? record[@_extract_time_key] : record.delete(@_extract_time_key) + return @_extract_time_parser.call(v) end nil @@ -45,7 +47,9 @@ module ExtractParams include Fluent::Configurable config_section :extract, required: false, multi: false, param_name: :extract_config do config_param :tag_key, :string, default: nil + config_param :keep_tag_key, :bool, default: false config_param :time_key, :string, default: nil + config_param :keep_time_key, :bool, default: false # To avoid defining :time_type twice config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float @@ -64,7 +68,9 @@ def initialize super @_extract_enabled = false @_extract_tag_key = nil + @_extract_keep_tag_key = nil @_extract_time_key = nil + @_extract_keep_time_key = nil @_extract_time_parser = nil end @@ -73,15 +79,21 @@ def configure(conf) if @extract_config @_extract_tag_key = @extract_config.tag_key + @_extract_keep_tag_key = @extract_config.keep_tag_key @_extract_time_key = @extract_config.time_key if @_extract_time_key + @_extract_keep_time_key = @extract_config.keep_time_key @_extract_time_parser = case @extract_config.time_type - when :float then ->(v){ Fluent::EventTime.new(v.to_i, ((v.to_f - v.to_i) * 1_000_000_000).to_i) } - when :unixtime then ->(v){ Fluent::EventTime.new(v.to_i, 0) } + when :float then Fluent::NumericTimeParser.new(:float) + when :unixtime then Fluent::NumericTimeParser.new(:unixtime) else localtime = @extract_config.localtime && !@extract_config.utc Fluent::TimeParser.new(@extract_config.time_format, localtime, @extract_config.timezone) end + else + if @extract_config.time_format + log.warn "'time_format' specified without 'time_key', will be ignored" + end end @_extract_enabled = @_extract_tag_key || @_extract_time_key diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index cc1f285983..cec2cff7fc 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -113,6 +113,10 @@ def configure(conf) localtime = @inject_config.localtime && !@inject_config.utc Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone) end + else + if @inject_config.time_format + log.warn "'time_format' specified without 'time_key', will be ignored" + end end @_inject_enabled = @_inject_hostname_key || @_inject_tag_key || @_inject_time_key From 0195f608ebaad0b53fbd92f37e00287b0134bef5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:36:02 +0900 Subject: [PATCH 06/41] Add new API methods to show characteristic of data and how to feed data. --- lib/fluent/plugin/formatter.rb | 5 +++++ lib/fluent/plugin/parser.rb | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/fluent/plugin/formatter.rb b/lib/fluent/plugin/formatter.rb index 630310573a..2df69765de 100644 --- a/lib/fluent/plugin/formatter.rb +++ b/lib/fluent/plugin/formatter.rb @@ -26,6 +26,11 @@ class Formatter < Base configured_in :format + PARSER_TYPES = [:text_per_line, :text, :binary] + def formatter_type + :text_per_line + end + def format(tag, time, record) raise NotImplementedError, "Implement this method in child class" end diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index 02e4c3a716..e03fcd2daa 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -53,6 +53,11 @@ class ParserError < StandardError; end # for tests attr_reader :type_converters + PARSER_TYPES = [:text_per_line, :text, :binary] + def parser_type + :text_per_line + end + def configure(conf) super @@ -72,6 +77,24 @@ def call(*a, &b) parse(*a, &b) end + def implement?(feature) + methods_of_plugin = self.class.instance_methods(false) + case feature + when :parse_io then methods_of_plugin.include?(:parse_io) + when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data) + else + raise ArgumentError, "Unknown feature for parser plugin: #{feature}" + end + end + + def parse_io(io, &block) + raise NotImplementedError, "Optional API #parse_io is not implemented" + end + + def parse_partial_data(data, &block) + raise NotImplementedError, "Optional API #parse_partial_data is not implemented" + end + def parse_time(record) if @time_key && record.has_key?(@time_key) src = if @keep_time_key From ddf20a5df24c1e3fe61c2d4dad9d67c907e09661 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 26 Oct 2016 18:38:44 +0900 Subject: [PATCH 07/41] Fix in/out_exec plugins to use standard Formatter/Parser plugins, and make them free from ExecUtil::* And add missing formatter/parser which exist in ExecUtil::*. --- lib/fluent/plugin/formatter_msgpack.rb | 4 + lib/fluent/plugin/formatter_tsv.rb | 34 ++++++ lib/fluent/plugin/in_exec.rb | 148 ++++++++++--------------- lib/fluent/plugin/out_exec.rb | 69 +++--------- lib/fluent/plugin/parser_csv.rb | 5 +- lib/fluent/plugin/parser_json.rb | 12 ++ lib/fluent/plugin/parser_msgpack.rb | 50 +++++++++ test/plugin/test_in_exec.rb | 124 ++++++++------------- test/plugin/test_out_exec.rb | 10 +- 9 files changed, 225 insertions(+), 231 deletions(-) create mode 100644 lib/fluent/plugin/formatter_tsv.rb create mode 100644 lib/fluent/plugin/parser_msgpack.rb diff --git a/lib/fluent/plugin/formatter_msgpack.rb b/lib/fluent/plugin/formatter_msgpack.rb index a817a7da1f..b3b0d2c7c6 100644 --- a/lib/fluent/plugin/formatter_msgpack.rb +++ b/lib/fluent/plugin/formatter_msgpack.rb @@ -21,6 +21,10 @@ module Plugin class MessagePackFormatter < Formatter Plugin.register_formatter('msgpack', self) + def formatter_type + :binary + end + def format(tag, time, record) record.to_msgpack end diff --git a/lib/fluent/plugin/formatter_tsv.rb b/lib/fluent/plugin/formatter_tsv.rb new file mode 100644 index 0000000000..ef7c1262c8 --- /dev/null +++ b/lib/fluent/plugin/formatter_tsv.rb @@ -0,0 +1,34 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/formatter' + +module Fluent + module Plugin + class TSVFormatter < Formatter + Plugin.register_formatter('tsv', self) + + desc 'Names of fields included in each lines' + config_param :keys, :array, value_type: :string + desc 'The delimiter character (or string) of TSV values' + config_param :delimiter, :string, default: "\t" + + def format(tag, time, record) + @keys.map{|k| record[k].to_s }.join(@delimiter) + end + end + end +end diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index 1875331011..1510ba57c1 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -14,136 +14,104 @@ # limitations under the License. # -require 'strptime' -require 'yajl' - require 'fluent/plugin/input' -require 'fluent/time' -require 'fluent/timezone' -require 'fluent/config/error' +require 'yajl' module Fluent::Plugin class ExecInput < Fluent::Plugin::Input Fluent::Plugin.register_input('exec', self) - helpers :child_process - - def initialize - super - require 'fluent/plugin/exec_util' - end + helpers :compat_parameters, :extract, :parser, :child_process desc 'The command (program) to execute.' config_param :command, :string - desc 'The format used to map the program output to the incoming event.(tsv,json,msgpack)' - config_param :format, :string, default: 'tsv' - desc 'Specify the comma-separated keys when using the tsv format.' - config_param :keys, default: [] do |val| - val.split(',') + + config_section :parse do + config_set_default :@type, 'tsv' + config_set_default :time_type, :float + config_set_default :time_key, nil + config_set_default :estimate_current_event, false + end + + config_section :extract do + config_set_default :time_type, :float end + desc 'Tag of the output events.' config_param :tag, :string, default: nil - desc 'The key to use as the event tag instead of the value in the event record. ' - config_param :tag_key, :string, default: nil - desc 'The key to use as the event time instead of the value in the event record.' - config_param :time_key, :string, default: nil - desc 'The format of the event time used for the time_key parameter.' - config_param :time_format, :string, default: nil desc 'The interval time between periodic program runs.' config_param :run_interval, :time, default: nil + desc 'The default block size to read if parser requires partial read.' + config_param :read_block_size, :size, default: 10240 # 10k - def configure(conf) - super - - if conf['localtime'] - @localtime = true - elsif conf['utc'] - @localtime = false - end + attr_reader :parser - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - - if !@tag && !@tag_key - raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input" - end - - if @time_key - if @time_format - f = @time_format - @time_parse_proc = - begin - strptime = Strptime.new(f) - Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } - rescue - Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } - end - else - @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } + def configure(conf) + compat_parameters_convert(conf, :extract, :parser) + ['parse', 'extract'].each do |subsection_name| + if subsection = conf.elements(subsection_name).first + if subsection.has_key?('time_format') + subsection['time_type'] ||= 'string' + end end end - @parser = setup_parser(conf) - end + super - def setup_parser(conf) - case @format - when 'tsv' - if @keys.empty? - raise Fluent::ConfigError, "keys option is required on exec input for tsv format" - end - Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message)) - when 'json' - Fluent::ExecUtil::JSONParser.new(method(:on_message)) - when 'msgpack' - Fluent::ExecUtil::MessagePackParser.new(method(:on_message)) - else - Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message)) + if !@tag && (!@extract_config || !@extract_config.tag_key) + raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input" end + @parser = parser_create end def start super + receiver = case + when @parser.implement?(:parse_io) then method(:run_with_io) + when @parser.implement?(:parse_partial_data) then method(:run_with_partial_read) + else method(:run) + end if @run_interval - child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io| - run(io) - end + child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read], &receiver) else - child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io| - run(io) - end + child_process_execute(:exec_input, @command, immediate: true, mode: [:read], &receiver) end end - def run(io) - @parser.call(io) + def run_with_io(io) + @parser.parse_io(io, &method(:on_record)) end - private - - def on_message(record, parsed_time = nil) - if val = record.delete(@tag_key) - tag = val - else - tag = @tag + def run_with_partial_read(io) + until io.eof? + @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) end + rescue EOFError + # ignore + end - if parsed_time - time = parsed_time - else - if val = record.delete(@time_key) - time = @time_parse_proc.call(val) - else - time = Fluent::EventTime.now + def run(io) + if @parser.parser_type == :text_per_line + io.each_line do |line| + @parser.parse(line.chomp, &method(:on_record)) end + else + @parser.parse(io.read, &method(:on_record)) end + rescue EOFError + # ignore + end + def on_record(time, record) + tag = nil + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e - log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record) + log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e + router.emit_error_event(tag, time, record, "exec failed to emit") end end end diff --git a/lib/fluent/plugin/out_exec.rb b/lib/fluent/plugin/out_exec.rb index 2e096c13ea..cb19d8a93a 100644 --- a/lib/fluent/plugin/out_exec.rb +++ b/lib/fluent/plugin/out_exec.rb @@ -18,77 +18,40 @@ require 'fluent/plugin/output' require 'fluent/config/error' -require 'fluent/plugin/exec_util' -require 'fluent/mixin' # for TimeFormatter module Fluent::Plugin class ExecOutput < Output Fluent::Plugin.register_output('exec', self) - helpers :compat_parameters + helpers :inject, :formatter, :compat_parameters desc 'The command (program) to execute. The exec plugin passes the path of a TSV file as the last argumen' config_param :command, :string - desc 'Specify the comma-separated keys when using the tsv format.' - config_param :keys, default: [] do |val| - val.split(',') - end - desc 'The name of the key to use as the event tag. This replaces the value in the event record.' - config_param :tag_key, :string, default: nil - desc 'The name of the key to use as the event time. This replaces the the value in the event record.' - config_param :time_key, :string, default: nil - desc 'The format for event time used when the time_key parameter is specified. The default is UNIX time (integer).' - config_param :time_format, :string, default: nil - desc "The format used to map the incoming events to the program input. (#{Fluent::ExecUtil::SUPPORTED_FORMAT.keys.join(',')})" - config_param :format, default: :tsv, skip_accessor: true do |val| - f = Fluent::ExecUtil::SUPPORTED_FORMAT[val] - raise Fluent::ConfigError, "Unsupported format '#{val}'" unless f - f + + config_section :format do + config_set_default :@type, 'tsv' end - config_param :localtime, :bool, default: false - config_param :timezone, :string, default: nil - def compat_parameters_default_chunk_key - 'time' + config_section :inject do + config_set_default :time_type, :string + config_set_default :localtime, false end - def configure(conf) - compat_parameters_convert(conf, :buffer, default_chunk_key: 'time') + attr_reader :formatter # for tests + def configure(conf) + compat_parameters_convert(conf, :inject, :formatter, :buffer, default_chunk_key: 'time') super - - @formatter = case @format - when :tsv - if @keys.empty? - raise Fluent::ConfigError, "keys option is required on exec output for tsv format" - end - Fluent::ExecUtil::TSVFormatter.new(@keys) - when :json - Fluent::ExecUtil::JSONFormatter.new - when :msgpack - Fluent::ExecUtil::MessagePackFormatter.new - end - - if @time_key - if @time_format - tf = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) - @time_format_proc = tf.method(:format) - else - @time_format_proc = Proc.new { |time| time.to_s } - end - end + @formatter = formatter_create end def format(tag, time, record) - out = '' - if @time_key - record[@time_key] = @time_format_proc.call(time) - end - if @tag_key - record[@tag_key] = tag + record = inject_values_to_record(tag, time, record) + if @formatter.formatter_type == :text_per_line + @formatter.format(tag, time, record).chomp + "\n" + else + @formatter.format(tag, time, record) end - @formatter.call(record, out) - out end def write(chunk) diff --git a/lib/fluent/plugin/parser_csv.rb b/lib/fluent/plugin/parser_csv.rb index 2afdc8291e..0229ec1e56 100644 --- a/lib/fluent/plugin/parser_csv.rb +++ b/lib/fluent/plugin/parser_csv.rb @@ -28,11 +28,10 @@ class CSVParser < Parser desc 'The delimiter character (or string) of CSV values' config_param :delimiter, :string, default: ',' - def parse(text) + def parse(text, &block) values = CSV.parse_line(text, col_sep: @delimiter) r = Hash[@keys.zip(values)] - time, record = convert_values(parse_time(r), r) - yield time, record + convert_values(parse_time(r), r, &block) end end end diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 82e5e5e04c..5548b7b45e 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -64,6 +64,18 @@ def parse(text) rescue @error_class yield nil, nil end + + def parser_type + :text + end + + def parse_io(io, &block) + y = Yajl::Parser.new + y.on_parse_complete = ->(record){ + block.call(parse_time(record), record) + } + y.parse(io) + end end end end diff --git a/lib/fluent/plugin/parser_msgpack.rb b/lib/fluent/plugin/parser_msgpack.rb new file mode 100644 index 0000000000..96967065c1 --- /dev/null +++ b/lib/fluent/plugin/parser_msgpack.rb @@ -0,0 +1,50 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/parser' +require 'fluent/msgpack_factory' + +module Fluent + module Plugin + class MessagePackParser < Parser + Plugin.register_parser('msgpack', self) + + def configure(conf) + super + @unpacker = Fluent::MessagePackFactory.engine_factory.unpacker + end + + def parser_type + :binary + end + + def parse(data) + @unpacker.feed_each(data) do |obj| + yield convert_values(parse_time(obj), obj) + end + end + alias parse_partial_data parse + + def parse_io(io) + u = Fluent::MessagePackFactory.engine_factory.unpacker(io) + u.each do |obj| + time, record = convert_values(parse_time(obj), obj) + yield time, record + end + end + end + end +end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 6e179193f3..7cbe660abe 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -4,130 +4,94 @@ require 'net/http' class ExecInputTest < Test::Unit::TestCase + SCRIPT_PATH = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + TEST_TIME = "2011-01-02 13:14:15" + TEST_UNIX_TIME = Time.parse(TEST_TIME) + def setup Fluent::Test.setup - @test_time = event_time("2011-01-02 13:14:15") - @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + @test_time = event_time() end - def create_driver(conf = tsv_config) + def create_driver(conf = TSV_CONFIG) Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end - def tsv_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 0 + TSV_CONFIG = %[ + command ruby #{SCRIPT_PATH} "#{TEST_TIME}" 0 keys time,tag,k1 time_key time tag_key tag time_format %Y-%m-%d %H:%M:%S run_interval 1s - ] - end + ] - def json_config - %[ - command ruby #{@script} #{@test_time} 1 + JSON_CONFIG = %[ + command ruby #{SCRIPT_PATH} #{TEST_UNIX_TIME.to_i} 1 format json tag_key tag time_key time run_interval 1s - ] - end + ] - def msgpack_config - %[ - command ruby #{@script} #{@test_time} 2 + MSGPACK_CONFIG = %[ + command ruby #{SCRIPT_PATH} #{TEST_UNIX_TIME.to_i} 2 format msgpack tag_key tagger time_key datetime run_interval 1s - ] - end + ] - def regexp_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 3 + REGEXP_CONFIG = %[ + command ruby #{SCRIPT_PATH} "#{TEST_TIME}" 3 format /(?