diff --git a/example/out_exec_filter.conf b/example/out_exec_filter.conf new file mode 100644 index 0000000000..5ec0f26c5d --- /dev/null +++ b/example/out_exec_filter.conf @@ -0,0 +1,42 @@ + + @type dummy + @label @exec + tag exec_input + rate 10 + auto_increment_key num + dummy {"data":"mydata"} + + + + + + diff --git a/lib/fluent/plugin/formatter.rb b/lib/fluent/plugin/formatter.rb index 630310573a..2df69765de 100644 --- a/lib/fluent/plugin/formatter.rb +++ b/lib/fluent/plugin/formatter.rb @@ -26,6 +26,11 @@ class Formatter < Base configured_in :format + PARSER_TYPES = [:text_per_line, :text, :binary] + def formatter_type + :text_per_line + end + def format(tag, time, record) raise NotImplementedError, "Implement this method in child class" end diff --git a/lib/fluent/plugin/formatter_msgpack.rb b/lib/fluent/plugin/formatter_msgpack.rb index a817a7da1f..b3b0d2c7c6 100644 --- a/lib/fluent/plugin/formatter_msgpack.rb +++ b/lib/fluent/plugin/formatter_msgpack.rb @@ -21,6 +21,10 @@ module Plugin class MessagePackFormatter < Formatter Plugin.register_formatter('msgpack', self) + def formatter_type + :binary + end + def format(tag, time, record) record.to_msgpack end diff --git a/lib/fluent/plugin/formatter_tsv.rb b/lib/fluent/plugin/formatter_tsv.rb new file mode 100644 index 0000000000..ab8204b3b5 --- /dev/null +++ b/lib/fluent/plugin/formatter_tsv.rb @@ -0,0 +1,34 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/formatter' + +module Fluent + module Plugin + class TSVFormatter < Formatter + Plugin.register_formatter('tsv', self) + + desc 'Field names included in each lines' + config_param :keys, :array, value_type: :string + desc 'The delimiter character (or string) of TSV values' + config_param :delimiter, :string, default: "\t" + + def format(tag, time, record) + @keys.map{|k| record[k].to_s }.join(@delimiter) + end + end + end +end diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index 1875331011..404f877d4e 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -14,136 +14,91 @@ # limitations under the License. # -require 'strptime' -require 'yajl' - require 'fluent/plugin/input' -require 'fluent/time' -require 'fluent/timezone' -require 'fluent/config/error' +require 'yajl' module Fluent::Plugin class ExecInput < Fluent::Plugin::Input Fluent::Plugin.register_input('exec', self) - helpers :child_process - - def initialize - super - require 'fluent/plugin/exec_util' - end + helpers :compat_parameters, :extract, :parser, :child_process desc 'The command (program) to execute.' config_param :command, :string - desc 'The format used to map the program output to the incoming event.(tsv,json,msgpack)' - config_param :format, :string, default: 'tsv' - desc 'Specify the comma-separated keys when using the tsv format.' - config_param :keys, default: [] do |val| - val.split(',') + + config_section :parse do + config_set_default :@type, 'tsv' + config_set_default :time_type, :float + config_set_default :time_key, nil + config_set_default :estimate_current_event, false + end + + config_section :extract do + config_set_default :time_type, :float end + desc 'Tag of the output events.' config_param :tag, :string, default: nil - desc 'The key to use as the event tag instead of the value in the event record. ' - config_param :tag_key, :string, default: nil - desc 'The key to use as the event time instead of the value in the event record.' - config_param :time_key, :string, default: nil - desc 'The format of the event time used for the time_key parameter.' - config_param :time_format, :string, default: nil desc 'The interval time between periodic program runs.' config_param :run_interval, :time, default: nil + desc 'The default block size to read if parser requires partial read.' + config_param :read_block_size, :size, default: 10240 # 10k - def configure(conf) - super - - if conf['localtime'] - @localtime = true - elsif conf['utc'] - @localtime = false - end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - - if !@tag && !@tag_key - raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input" - end + attr_reader :parser - if @time_key - if @time_format - f = @time_format - @time_parse_proc = - begin - strptime = Strptime.new(f) - Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } - rescue - Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } - end - else - @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } + def configure(conf) + compat_parameters_convert(conf, :extract, :parser) + ['parse', 'extract'].each do |subsection_name| + if subsection = conf.elements(subsection_name).first + if subsection.has_key?('time_format') + subsection['time_type'] ||= 'string' + end end end - @parser = setup_parser(conf) - end + super - def setup_parser(conf) - case @format - when 'tsv' - if @keys.empty? - raise Fluent::ConfigError, "keys option is required on exec input for tsv format" - end - Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message)) - when 'json' - Fluent::ExecUtil::JSONParser.new(method(:on_message)) - when 'msgpack' - Fluent::ExecUtil::MessagePackParser.new(method(:on_message)) - else - Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message)) + if !@tag && (!@extract_config || !@extract_config.tag_key) + raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input" end + @parser = parser_create end def start super if @run_interval - child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io| - run(io) - end + child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read], &method(:run)) else - child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io| - run(io) - end + child_process_execute(:exec_input, @command, immediate: true, mode: [:read], &method(:run)) end end def run(io) - @parser.call(io) - end - - private - - def on_message(record, parsed_time = nil) - if val = record.delete(@tag_key) - tag = val - else - tag = @tag - end - - if parsed_time - time = parsed_time - else - if val = record.delete(@time_key) - time = @time_parse_proc.call(val) - else - time = Fluent::EventTime.now + case + when @parser.implement?(:parse_io) + @parser.parse_io(io, &method(:on_record)) + when @parser.implement?(:parse_partial_data) + until io.eof? + @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) + end + when @parser.parser_type == :text_per_line + io.each_line do |line| + @parser.parse(line.chomp, &method(:on_record)) end + else + @parser.parse(io.read, &method(:on_record)) end + end + def on_record(time, record) + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e - log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record) + log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e + router.emit_error_event(tag, time, record, e) if tag && time && record end end end diff --git a/lib/fluent/plugin/out_exec.rb b/lib/fluent/plugin/out_exec.rb index 2e096c13ea..a5bc0ea79c 100644 --- a/lib/fluent/plugin/out_exec.rb +++ b/lib/fluent/plugin/out_exec.rb @@ -18,97 +18,84 @@ require 'fluent/plugin/output' require 'fluent/config/error' -require 'fluent/plugin/exec_util' -require 'fluent/mixin' # for TimeFormatter module Fluent::Plugin class ExecOutput < Output Fluent::Plugin.register_output('exec', self) - helpers :compat_parameters + helpers :inject, :formatter, :compat_parameters, :child_process - desc 'The command (program) to execute. The exec plugin passes the path of a TSV file as the last argumen' + desc 'The command (program) to execute. The exec plugin passes the path of a TSV file as the last argument' config_param :command, :string - desc 'Specify the comma-separated keys when using the tsv format.' - config_param :keys, default: [] do |val| - val.split(',') + + config_param :command_timeout, :time, default: 270 # 4min 30sec + + config_section :format do + config_set_default :@type, 'tsv' end - desc 'The name of the key to use as the event tag. This replaces the value in the event record.' - config_param :tag_key, :string, default: nil - desc 'The name of the key to use as the event time. This replaces the the value in the event record.' - config_param :time_key, :string, default: nil - desc 'The format for event time used when the time_key parameter is specified. The default is UNIX time (integer).' - config_param :time_format, :string, default: nil - desc "The format used to map the incoming events to the program input. (#{Fluent::ExecUtil::SUPPORTED_FORMAT.keys.join(',')})" - config_param :format, default: :tsv, skip_accessor: true do |val| - f = Fluent::ExecUtil::SUPPORTED_FORMAT[val] - raise Fluent::ConfigError, "Unsupported format '#{val}'" unless f - f + + config_section :inject do + config_set_default :time_type, :string + config_set_default :localtime, false end - config_param :localtime, :bool, default: false - config_param :timezone, :string, default: nil - def compat_parameters_default_chunk_key - 'time' + config_section :buffer do + config_set_default :delayed_commit_timeout, 300 # 5 min end - def configure(conf) - compat_parameters_convert(conf, :buffer, default_chunk_key: 'time') + attr_reader :formatter # for tests + def configure(conf) + compat_parameters_convert(conf, :inject, :formatter, :buffer, default_chunk_key: 'time') super - - @formatter = case @format - when :tsv - if @keys.empty? - raise Fluent::ConfigError, "keys option is required on exec output for tsv format" - end - Fluent::ExecUtil::TSVFormatter.new(@keys) - when :json - Fluent::ExecUtil::JSONFormatter.new - when :msgpack - Fluent::ExecUtil::MessagePackFormatter.new - end - - if @time_key - if @time_format - tf = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) - @time_format_proc = tf.method(:format) - else - @time_format_proc = Proc.new { |time| time.to_s } - end - end + @formatter = formatter_create end - def format(tag, time, record) - out = '' - if @time_key - record[@time_key] = @time_format_proc.call(time) - end - if @tag_key - record[@tag_key] = tag - end - @formatter.call(record, out) - out - end + NEWLINE = "\n" - def write(chunk) - if chunk.respond_to?(:path) - prog = "#{@command} #{chunk.path}" + def format(tag, time, record) + record = inject_values_to_record(tag, time, record) + if @formatter.formatter_type == :text_per_line + @formatter.format(tag, time, record).chomp + NEWLINE else - tmpfile = Tempfile.new("fluent-plugin-exec-") - tmpfile.binmode - chunk.write_to(tmpfile) - tmpfile.close - prog = "#{@command} #{tmpfile.path}" + @formatter.format(tag, time, record) end + end - system(prog) - ecode = $?.to_i - tmpfile.delete if tmpfile - - if ecode != 0 - raise "command returns #{ecode}: #{prog}" - end + def try_write(chunk) + tmpfile = nil + prog = if chunk.respond_to?(:path) + "#{@command} #{chunk.path}" + else + tmpfile = Tempfile.new("fluent-plugin-out-exec-") + tmpfile.binmode + chunk.write_to(tmpfile) + tmpfile.close + "#{@command} #{tmpfile.path}" + end + chunk_id = chunk.unique_id + callback = ->(status){ + begin + if tmpfile + tmpfile.delete rescue nil + end + if status && status.success? + commit_write(chunk_id) + elsif status + # #rollback_write will be done automatically if it isn't called at here. + # But it's after command_timeout, and this timeout should be longer than users expectation. + # So here, this plugin calls it explicitly. + rollback_write(chunk_id) + log.warn "command exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig + else + rollback_write(chunk_id) + log.warn "command unexpectedly exits without exit status", prog: prog + end + rescue => e + log.error "unexpected error in child process callback", error: e + end + } + child_process_execute(:out_exec_process, prog, stderr: :connect, immediate: true, parallel: true, mode: [], wait_timeout: @command_timeout, on_exit_callback: callback) end end end diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index c8045e9f1e..b839143cb6 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -13,148 +13,142 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -require 'yajl' - -require 'fluent/output' +require 'fluent/plugin/output' require 'fluent/env' -require 'fluent/time' -require 'fluent/timezone' -require 'fluent/plugin/exec_util' require 'fluent/config/error' -module Fluent - class ExecFilterOutput < BufferedOutput - Plugin.register_output('exec_filter', self) +require 'yajl' - def initialize - super - require 'fluent/timezone' - end +module Fluent::Plugin + class ExecFilterOutput < Output + Fluent::Plugin.register_output('exec_filter', self) + + helpers :compat_parameters, :inject, :formatter, :parser, :extract, :child_process, :event_emitter desc 'The command (program) to execute.' config_param :command, :string - config_param :remove_prefix, :string, default: nil - config_param :add_prefix, :string, default: nil + config_param :remove_prefix, :string, default: nil, deprecated: "use @label instead for event routing" + config_param :add_prefix, :string, default: nil, deprecated: "use @label instead for event routing" + + config_section :inject do + config_set_default :time_type, :unixtime + end - desc "The format used to map the incoming event to the program input.(#{Fluent::ExecUtil::SUPPORTED_FORMAT.keys.join(',')})" - config_param :in_format, default: :tsv do |val| - f = Fluent::ExecUtil::SUPPORTED_FORMAT[val] - raise ConfigError, "Unsupported in_format '#{val}'" unless f - f + config_section :format do + config_set_default :@type, 'tsv' + config_set_default :localtime, true end - desc 'Specify comma-separated values for tsv format.' - config_param :in_keys, default: [] do |val| - val.split(',') + + config_section :parse do + config_set_default :@type, 'tsv' + config_set_default :time_key, nil + config_set_default :time_format, nil + config_set_default :localtime, true + config_set_default :estimate_current_event, false end - desc 'The name of the key to use as the event tag.' - config_param :in_tag_key, default: nil - desc 'The name of the key to use as the event time.' - config_param :in_time_key, default: nil - desc 'The format for event time used when the in_time_key parameter is specified.(Defauls is UNIX time)' - config_param :in_time_format, default: nil - - desc "The format used to process the program output.(#{Fluent::ExecUtil::SUPPORTED_FORMAT.keys.join(',')})" - config_param :out_format, default: :tsv do |val| - f = Fluent::ExecUtil::SUPPORTED_FORMAT[val] - raise ConfigError, "Unsupported out_format '#{val}'" unless f - f + + config_section :extract do + config_set_default :time_type, :float end - desc 'Specify comma-separated values for tsv format.' - config_param :out_keys, default: [] do |val| # for tsv format - val.split(',') + + config_section :buffer do + config_set_default :flush_mode, :interval + config_set_default :flush_interval, 1 end - desc 'The name of the key to use as the event tag.' - config_param :out_tag_key, default: nil - desc 'The name of the key to use as the event time.' - config_param :out_time_key, default: nil - desc 'The format for event time used when the in_time_key parameter is specified.(Defauls is UNIX time)' - config_param :out_time_format, default: nil config_param :tag, :string, default: nil - config_param :time_key, :string, default: nil - config_param :time_format, :string, default: nil + config_param :tag_key, :string, default: nil, deprecated: "use 'tag_key' in / instead" + config_param :time_key, :string, default: nil, deprecated: "use 'time_key' in / instead" + config_param :time_format, :string, default: nil, deprecated: "use 'time_format' in / instead" + + desc 'The default block size to read if parser requires partial read.' + config_param :read_block_size, :size, default: 10240 # 10k - desc 'If true, use localtime with in_time_format.' - config_param :localtime, :bool, default: true - desc 'If true, use timezone with in_time_format.' - config_param :timezone, :string, default: nil desc 'The number of spawned process for command.' config_param :num_children, :integer, default: 1 - desc 'Respawn command when command exit.' + desc 'Respawn command when command exit. ["none", "inf" or positive integer for times to respawn (defaut: none)]' # nil, 'none' or 0: no respawn, 'inf' or -1: infinite times, positive integer: try to respawn specified times only config_param :child_respawn, :string, default: nil # 0: output logs for all of messages to emit config_param :suppress_error_log_interval, :time, default: 0 - config_set_default :flush_interval, 1 - - def configure(conf) - if tag_key = conf['tag_key'] - # TODO obsoleted? - conf['in_tag_key'] = tag_key - conf['out_tag_key'] = tag_key - end - - if time_key = conf['time_key'] - # TODO obsoleted? - conf['in_time_key'] = time_key - conf['out_time_key'] = time_key + attr_reader :formatter, :parser # for tests + + KEYS_FOR_IN_AND_OUT = { + 'tag_key' => ['in_tag_key', 'out_tag_key'], + 'time_key' => ['in_time_key', 'out_time_key'], + 'time_format' => ['in_time_format', 'out_time_format'], + } + COMPAT_INJECT_PARAMS = { + 'in_tag_key' => 'tag_key', + 'in_time_key' => 'time_key', + 'in_time_format' => 'time_format', + } + COMPAT_FORMAT_PARAMS = { + 'in_format' => '@type', + 'in_keys' => 'keys', + } + COMPAT_PARSE_PARAMS = { + 'out_format' => '@type', + 'out_keys' => 'keys', + } + COMPAT_EXTRACT_PARAMS = { + 'out_tag_key' => 'tag_key', + 'out_time_key' => 'time_key', + 'out_time_format' => 'time_format', + } + + def exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params) + return unless conf.elements(subsection_name).empty? + return unless params.keys.any?{|k| conf.has_key?(k) } + hash = {} + params.each_pair do |compat, current| + hash[current] = conf[compat] if conf.has_key?(compat) end + conf.elements << Fluent::Config::Element.new(subsection_name, '', hash, []) + end - if time_format = conf['time_format'] - # TODO obsoleted? - conf['in_time_format'] = time_format - conf['out_time_format'] = time_format + def exec_filter_compat_parameters_convert!(conf) + KEYS_FOR_IN_AND_OUT.each_pair do |inout, keys| + if conf.has_key?(inout) + keys.each do |k| + conf[k] = conf[inout] + end + end end + exec_filter_compat_parameters_copy_to_subsection!(conf, 'inject', COMPAT_INJECT_PARAMS) + exec_filter_compat_parameters_copy_to_subsection!(conf, 'format', COMPAT_FORMAT_PARAMS) + exec_filter_compat_parameters_copy_to_subsection!(conf, 'parse', COMPAT_PARSE_PARAMS) + exec_filter_compat_parameters_copy_to_subsection!(conf, 'extract', COMPAT_EXTRACT_PARAMS) + end - super + def configure(conf) + exec_filter_compat_parameters_convert!(conf) + compat_parameters_convert(conf, :buffer) - if conf['localtime'] - @localtime = true - elsif conf['utc'] - @localtime = false + if inject_section = conf.elements('inject').first + if inject_section.has_key?('time_format') + inject_section['time_type'] ||= 'string' + end end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) + if extract_section = conf.elements('extract').first + if extract_section.has_key?('time_format') + extract_section['time_type'] ||= 'string' + end end - if !@tag && !@out_tag_key - raise ConfigError, "'tag' or 'out_tag_key' option is required on exec_filter output" - end + super - if @in_time_key - if f = @in_time_format - tf = TimeFormatter.new(f, @localtime, @timezone) - @time_format_proc = tf.method(:format) - else - @time_format_proc = Proc.new {|time| time.to_s } - end - elsif @in_time_format - log.warn "in_time_format effects nothing when in_time_key is not specified: #{conf}" + if !@tag && (!@extract_config || !@extract_config.tag_key) + raise Fluent::ConfigError, "'tag' or ' tag_key ' option is required on exec_filter output" end - if @out_time_key - if f = @out_time_format - @time_parse_proc = - begin - strptime = Strptime.new(f) - Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } - rescue - Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } - end - else - @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } - end - elsif @out_time_format - log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}" - end + @formatter = formatter_create + @parser = parser_create if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @@ -164,30 +158,6 @@ def configure(conf) @added_prefix_string = @add_prefix + '.' end - case @in_format - when :tsv - if @in_keys.empty? - raise ConfigError, "in_keys option is required on exec_filter output for tsv in_format" - end - @formatter = Fluent::ExecUtil::TSVFormatter.new(@in_keys) - when :json - @formatter = Fluent::ExecUtil::JSONFormatter.new - when :msgpack - @formatter = Fluent::ExecUtil::MessagePackFormatter.new - end - - case @out_format - when :tsv - if @out_keys.empty? - raise ConfigError, "out_keys option is required on exec_filter output for tsv in_format" - end - @parser = Fluent::ExecUtil::TSVParser.new(@out_keys, method(:on_message)) - when :json - @parser = Fluent::ExecUtil::JSONParser.new(method(:on_message)) - when :msgpack - @parser = Fluent::ExecUtil::MessagePackParser.new(method(:on_message)) - end - @respawns = if @child_respawn.nil? or @child_respawn == 'none' or @child_respawn == '0' 0 elsif @child_respawn == 'inf' or @child_respawn == '-1' @@ -202,192 +172,142 @@ def configure(conf) @next_log_time = Time.now.to_i end + ExecutedProcess = Struct.new(:mutex, :pid, :respawns, :readio, :writeio) + def start super + @children_mutex = Mutex.new @children = [] @rr = 0 - begin - @num_children.times do - c = ChildProcess.new(@parser, @respawns, log) - c.start(@command) - @children << c - end - rescue - shutdown - raise - end - end - def before_shutdown - log.debug "out_exec_filter#before_shutdown called" - @children.each {|c| - c.finished = true - } - sleep 0.5 # TODO wait time before killing child process - - super - end - - def shutdown - @children.reject! {|c| - c.shutdown - true + exit_callback = ->(status){ + c = @children.select{|child| child.pid == status.pid }.first + if c + unless self.stopped? + log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig + end + c.mutex.synchronize do + (c.writeio && c.writeio.close) rescue nil + (c.readio && c.readio.close) rescue nil + c.pid = c.readio = c.writeio = nil + end + end } - - super - end - - def format_stream(tag, es) - if @remove_prefix - if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @removed_prefix - tag = tag[@removed_length..-1] || '' + child_process_callback = ->(index, readio, writeio){ + pid = child_process_id + c = @children[index] + writeio.sync = true + c.mutex.synchronize do + c.pid = pid + c.respawns = @respawns + c.readio = readio + c.writeio = writeio end - end - out = '' - - es.each {|time,record| - if @in_time_key - record[@in_time_key] = @time_format_proc.call(time) - end - if @in_tag_key - record[@in_tag_key] = tag + run(readio) + } + execute_child_process = ->(index){ + child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio| + child_process_callback.call(index, readio, writeio) end - @formatter.call(record, out) } - out - end - - def write(chunk) - r = @rr = (@rr + 1) % @children.length - @children[r].write chunk - end - - class ChildProcess - attr_accessor :finished - - def initialize(parser, respawns=0, log = $log) - @pid = nil - @thread = nil - @parser = parser - @respawns = respawns - @mutex = Mutex.new - @finished = nil - @log = log - end - - def start(command) - @command = command - @mutex.synchronize do - @io = IO.popen(command, "r+") - @pid = @io.pid - @io.sync = true - @thread = Thread.new(&method(:run)) + @children_mutex.synchronize do + @num_children.times do |i| + @children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil) + execute_child_process.call(i) end - @finished = false end - def kill_child(join_wait) - begin - signal = Fluent.windows? ? :KILL : :TERM - Process.kill(signal, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM - # Errno::ESRCH 'No such process', ignore - # child process killed by signal chained from fluentd process - end - if @thread.join(join_wait) - # @thread successfully shutdown - return - end - begin - Process.kill(:KILL, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM - # ignore if successfully killed by :TERM + if @respawns != 0 + thread_create(:out_exec_filter_respawn_monitor) do + while thread_current_running? + @children.each_with_index do |c, i| + if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 } + respawns = c.mutex.synchronize do + c.respawns -= 1 if c.respawns > 0 + c.respawns + end + log.info "respawning child process", num: i, respawn_counter: respawns + execute_child_process.call(i) + end + end + sleep 0.2 + end end - @thread.join end + end - def shutdown - @finished = true - @mutex.synchronize do - kill_child(60) # TODO wait time - end - end + def terminate + @children = [] + super + end - def write(chunk) - begin - chunk.write_to(@io) - rescue Errno::EPIPE => e - # Broken pipe (child process unexpectedly exited) - @log.warn "exec_filter Broken pipe, child process maybe exited.", command: @command - if try_respawn - retry # retry chunk#write_to with child respawned - else - raise e # to retry #write with other ChildProcess instance (when num_children > 1) - end + def tag_remove_prefix(tag) + if @remove_prefix + if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @removed_prefix_string + tag = tag[@removed_length..-1] || '' end end + tag + end - def try_respawn - return false if @respawns == 0 - @mutex.synchronize do - return false if @respawns == 0 - - kill_child(5) # TODO wait time - - @io = IO.popen(@command, "r+") - @pid = @io.pid - @io.sync = true - @thread = Thread.new(&method(:run)) + NEWLINE = "\n" - @respawns -= 1 if @respawns > 0 - end - @log.warn "exec_filter child process successfully respawned.", command: @command, respawns: @respawns - true + def format(tag, time, record) + tag = tag_remove_prefix(tag) + record = inject_values_to_record(tag, time, record) + if @formatter.formatter_type == :text_per_line + @formatter.format(tag, time, record).chomp + NEWLINE + else + @formatter.format(tag, time, record) end + end - def run - @parser.call(@io) - rescue - @log.error "exec_filter thread unexpectedly failed with an error.", command: @command, error: $!.to_s - @log.warn_backtrace $!.backtrace - ensure - _pid, stat = Process.waitpid2(@pid) - unless @finished - @log.error "exec_filter process unexpectedly exited.", command: @command, ecode: stat.to_i - unless @respawns == 0 - @log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})." - end + def write(chunk) + try_times = 0 + while true + r = @rr = (@rr + 1) % @children.length + if @children[r].pid && writeio = @children[r].writeio + chunk.write_to(writeio) + break end + try_times += 1 + raise "no healthy child processes exist" if try_times >= @children.length end end - def on_message(record) - if val = record.delete(@out_time_key) - time = @time_parse_proc.call(val) - else - time = Engine.now - end - - if val = record.delete(@out_tag_key) - tag = if @add_prefix - @added_prefix_string + val - else - val - end + def run(io) + case + when @parser.implement?(:parse_io) + @parser.parse_io(io, &method(:on_record)) + when @parser.implement?(:parse_partial_data) + until io.eof? + @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) + end + when @parser.parser_type == :text_per_line + io.each_line do |line| + @parser.parse(line.chomp, &method(:on_record)) + end else - tag = @tag + @parser.parse(io.read, &method(:on_record)) end + end + def on_record(time, record) + tag = extract_tag_from_record(record) + tag = @added_prefix_string + tag if tag && @add_prefix + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) - rescue + rescue => e if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time - log.error "exec_filter failed to emit", error: $!, record: Yajl.dump(record) - log.warn_backtrace $!.backtrace + log.error "exec_filter failed to emit", record: Yajl.dump(record), error: e + log.error_backtrace e.backtrace @next_log_time = Time.now.to_i + @suppress_error_log_interval end + router.emit_error_event(tag, time, record, e) if tag && time && record end end end diff --git a/lib/fluent/plugin/parser.rb b/lib/fluent/plugin/parser.rb index 02e4c3a716..e03fcd2daa 100644 --- a/lib/fluent/plugin/parser.rb +++ b/lib/fluent/plugin/parser.rb @@ -53,6 +53,11 @@ class ParserError < StandardError; end # for tests attr_reader :type_converters + PARSER_TYPES = [:text_per_line, :text, :binary] + def parser_type + :text_per_line + end + def configure(conf) super @@ -72,6 +77,24 @@ def call(*a, &b) parse(*a, &b) end + def implement?(feature) + methods_of_plugin = self.class.instance_methods(false) + case feature + when :parse_io then methods_of_plugin.include?(:parse_io) + when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data) + else + raise ArgumentError, "Unknown feature for parser plugin: #{feature}" + end + end + + def parse_io(io, &block) + raise NotImplementedError, "Optional API #parse_io is not implemented" + end + + def parse_partial_data(data, &block) + raise NotImplementedError, "Optional API #parse_partial_data is not implemented" + end + def parse_time(record) if @time_key && record.has_key?(@time_key) src = if @keep_time_key diff --git a/lib/fluent/plugin/parser_csv.rb b/lib/fluent/plugin/parser_csv.rb index 2afdc8291e..529fa246f4 100644 --- a/lib/fluent/plugin/parser_csv.rb +++ b/lib/fluent/plugin/parser_csv.rb @@ -28,7 +28,7 @@ class CSVParser < Parser desc 'The delimiter character (or string) of CSV values' config_param :delimiter, :string, default: ',' - def parse(text) + def parse(text, &block) values = CSV.parse_line(text, col_sep: @delimiter) r = Hash[@keys.zip(values)] time, record = convert_values(parse_time(r), r) diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 82e5e5e04c..5548b7b45e 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -64,6 +64,18 @@ def parse(text) rescue @error_class yield nil, nil end + + def parser_type + :text + end + + def parse_io(io, &block) + y = Yajl::Parser.new + y.on_parse_complete = ->(record){ + block.call(parse_time(record), record) + } + y.parse(io) + end end end end diff --git a/lib/fluent/plugin/parser_msgpack.rb b/lib/fluent/plugin/parser_msgpack.rb new file mode 100644 index 0000000000..b35f04b9f6 --- /dev/null +++ b/lib/fluent/plugin/parser_msgpack.rb @@ -0,0 +1,50 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/parser' +require 'fluent/msgpack_factory' + +module Fluent + module Plugin + class MessagePackParser < Parser + Plugin.register_parser('msgpack', self) + + def configure(conf) + super + @unpacker = Fluent::MessagePackFactory.engine_factory.unpacker + end + + def parser_type + :binary + end + + def parse(data) + @unpacker.feed_each(data) do |obj| + yield convert_values(parse_time(obj), obj) + end + end + alias parse_partial_data parse + + def parse_io(io, &block) + u = Fluent::MessagePackFactory.engine_factory.unpacker(io) + u.each do |obj| + time, record = convert_values(parse_time(obj), obj) + yield time, record + end + end + end + end +end diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index fda30bf6e3..8977f64913 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -36,7 +36,14 @@ def self.included(mod) end def helpers(*snake_case_symbols) - helper_modules = snake_case_symbols.map{|name| Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) } + helper_modules = [] + snake_case_symbols.each do |name| + begin + helper_modules << Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) + rescue NameError + raise "Unknown plugin helper:#{name}" + end + end include(*helper_modules) end end diff --git a/lib/fluent/plugin_helper/child_process.rb b/lib/fluent/plugin_helper/child_process.rb index f36b68e947..c62a2f548d 100644 --- a/lib/fluent/plugin_helper/child_process.rb +++ b/lib/fluent/plugin_helper/child_process.rb @@ -49,39 +49,60 @@ def child_process_id ::Thread.current[:_fluentd_plugin_helper_child_process_pid] end - def child_process_exit_status - ::Thread.current[:_fluentd_plugin_helper_child_process_exit_status] + def child_process_exist?(pid) + pinfo = @_child_process_processes[pid] + return false unless pinfo + + return false if pinfo.exit_status + + true end + # on_exit_callback = ->(status){ ... } + # status is an instance of Process::Status + # On Windows, exitstatus=0 and termsig=nil even when child process was killed. def child_process_execute( title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, + wait_timeout: nil, on_exit_callback: nil, &block ) raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments + mode ||= [] + mode = [] unless block raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) } raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr)) raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr) - raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given? - raise ArgumentError, "BUG: number of block arguments are different from size of mode" unless block.arity == mode.size + raise ArgumentError, "BUG: number of block arguments are different from size of mode" if block && block.arity != mode.size running = false callback = ->(*args) { running = true begin - block.call(*args) + block && block.call(*args) ensure running = false end } + retval = nil + execute_child_process = ->(){ + child_process_execute_once( + title, command, arguments, + subprocess_name, mode, stderr, env, unsetenv, chdir, + internal_encoding, external_encoding, scrub, replace_string, + wait_timeout, on_exit_callback, + &callback + ) + } + if immediate || !interval - child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback) + retval = execute_child_process.call end if interval @@ -89,10 +110,12 @@ def child_process_execute( if !parallel && running log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel else - child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback) + execute_child_process.call end end end + + retval # nil if interval end def initialize @@ -101,11 +124,8 @@ def initialize @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT @_child_process_mutex = Mutex.new - end - - def start - super @_child_process_processes = {} # pid => ProcessInfo + @_child_process_clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC end def stop @@ -115,86 +135,102 @@ def stop process_info.thread[:_fluentd_plugin_helper_child_process_running] = false end end + + super end def shutdown @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] - next if !process_info || !process_info.writeio_in_use - begin - Timeout.timeout(@_child_process_exit_timeout) do - process_info.writeio.close - end - rescue Timeout::Error - log.debug "External process #{process_info.title} doesn't exist after STDIN close in timeout #{@_child_process_exit_timeout}sec" - end + next if !process_info + process_info.writeio && process_info.writeio.close rescue nil + end + + super + @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| + process_info = @_child_process_processes[pid] + next if !process_info child_process_kill(process_info) end - super + exit_wait_timeout = Process.clock_gettime(@_child_process_clock_id) + @_child_process_exit_timeout + while Process.clock_gettime(@_child_process_clock_id) < exit_wait_timeout + process_exists = false + @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| + unless @_child_process_processes[pid].exit_status + process_exists = true + break + end + end + if process_exists + sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL + else + break + end + end end def close - while (pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }).size > 0 + while true + pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys } + break if pids.size < 1 + + living_process_exist = false pids.each do |pid| process_info = @_child_process_processes[pid] - if !process_info || !process_info.alive - @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } - next - end + next if !process_info || process_info.exit_status - process_info.killed_at ||= Time.now # for illegular case (e.g., created after shutdown) - next if Time.now < process_info.killed_at + @_child_process_kill_timeout + living_process_exist = true + + process_info.killed_at ||= Process.clock_gettime(@_child_process_clock_id) # for illegular case (e.g., created after shutdown) + timeout_at = process_info.killed_at + @_child_process_kill_timeout + now = Process.clock_gettime(@_child_process_clock_id) + next if now < timeout_at child_process_kill(process_info, force: true) - @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } end + break if living_process_exist + sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL end super end - def child_process_kill(process_info, force: false) - if !process_info || !process_info.alive - return - end + def terminate + @_child_process_processes = {} - process_info.killed_at = Time.now unless force + super + end - begin - pid, status = Process.waitpid2(process_info.pid, Process::WNOHANG) - if pid && status - process_info.thread[:_fluentd_plugin_helper_child_process_exit_status] = status - process_info.alive = false - end - rescue Errno::ECHILD, Errno::ESRCH, Errno::EPERM - process_info.alive = false - rescue - # ignore - end - if !process_info.alive - return - end + def child_process_kill(pinfo, force: false) + return if !pinfo + pinfo.killed_at = Process.clock_gettime(@_child_process_clock_id) unless force + pid = pinfo.pid begin - signal = (Fluent.windows? || force) ? :KILL : :TERM - Process.kill(signal, process_info.pid) - if force - process_info.alive = false + if !pinfo.exit_status && child_process_exist?(pid) + signal = (Fluent.windows? || force) ? :KILL : :TERM + Process.kill(signal, pinfo.pid) end rescue Errno::ECHILD, Errno::ESRCH - process_info.alive = false + # ignore end end - ProcessInfo = Struct.new(:title, :thread, :pid, :readio, :readio_in_use, :writeio, :writeio_in_use, :stderrio, :stderrio_in_use, :wait_thread, :alive, :killed_at) + ProcessInfo = Struct.new( + :title, + :thread, :pid, + :readio, :readio_in_use, :writeio, :writeio_in_use, :stderrio, :stderrio_in_use, + :wait_thread, :alive, :killed_at, :exit_status, + :on_exit_callback, :on_exit_callback_mutex, + ) def child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, - internal_encoding, external_encoding, scrub, replace_string, &block + internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block ) spawn_args = if arguments || subprocess_name [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ] @@ -227,36 +263,40 @@ def child_process_execute_once( writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts) else writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts) - if !mode.include?(:stderr) # stderr == :discard - stderrio.reopen(IO::NULL) - end end if mode.include?(:write) writeio.set_encoding(external_encoding, internal_encoding, encoding_options) writeio_in_use = true + else + writeio.reopen(IO::NULL) if writeio end if mode.include?(:read) || mode.include?(:read_with_stderr) readio.set_encoding(external_encoding, internal_encoding, encoding_options) readio_in_use = true + else + readio.reopen(IO::NULL) if readio end if mode.include?(:stderr) stderrio.set_encoding(external_encoding, internal_encoding, encoding_options) stderrio_in_use = true + else + stderrio.reopen(IO::NULL) if stderrio && stderrio == :discard end pid = wait_thread.pid # wait_thread => Process::Waiter io_objects = [] mode.each do |m| - io_objects << case m - when :read then readio - when :write then writeio - when :read_with_stderr then readio - when :stderr then stderrio - else - raise "BUG: invalid mode must be checked before here: '#{m}'" - end + io_obj = case m + when :read then readio + when :write then writeio + when :read_with_stderr then readio + when :stderr then stderrio + else + raise "BUG: invalid mode must be checked before here: '#{m}'" + end + io_objects << io_obj end m = Mutex.new @@ -265,28 +305,54 @@ def child_process_execute_once( m.lock # run after plugin thread get pid, thread instance and i/o m.unlock begin - block.call(*io_objects) + @_child_process_processes[pid].alive = true + block.call(*io_objects) if block_given? + writeio.close if writeio rescue EOFError => e log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments rescue IOError => e - if e.message == 'stream closed' + if e.message == 'stream closed' || e.message == 'closed stream' # "closed stream" is of ruby 2.1 log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments else log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e end + rescue Errno::EPIPE => e + log.debug "Broken pipe, child process unexpectedly exits", title: title, pid: pid, command: command, arguments: arguments rescue => e log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e end - process_info = @_child_process_mutex.synchronize do - process_info = @_child_process_processes[pid] - @_child_process_processes.delete(pid) - process_info + + if wait_timeout + if wait_thread.join(wait_timeout) # Thread#join returns nil when limit expires + # wait_thread successfully exits + @_child_process_processes[pid].exit_status = wait_thread.value + else + log.warn "child process timed out", title: title, pid: pid, command: command, arguments: arguments + child_process_kill(@_child_process_processes[pid], force: true) + @_child_process_processes[pid].exit_status = wait_thread.value + end + else + @_child_process_processes[pid].exit_status = wait_thread.value # with join + end + process_info = @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } + + cb = process_info.on_exit_callback_mutex.synchronize do + cback = process_info.on_exit_callback + process_info.on_exit_callback = nil + cback + end + if cb + cb.call(process_info.exit_status) rescue nil end - child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running] end thread[:_fluentd_plugin_helper_child_process_running] = true thread[:_fluentd_plugin_helper_child_process_pid] = pid - pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil) + pinfo = ProcessInfo.new( + title, thread, pid, + readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, + wait_thread, false, nil, nil, on_exit_callback, Mutex.new + ) + @_child_process_mutex.synchronize do @_child_process_processes[pid] = pinfo end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 879bff8f8e..1c9de44854 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -55,6 +55,8 @@ module CompatParameters "keys" => "keys", # CSVParser, TSVParser (old ValuesParser) "time_key" => "time_key", "time_format" => "time_format", + "localtim" => nil, + "utc" => nil, "delimiter" => "delimiter", "keep_time_key" => "keep_time_key", "null_empty_string" => "null_empty_string", @@ -69,19 +71,29 @@ module CompatParameters INJECT_PARAMS = { "include_time_key" => nil, - "time_key" => "time_key", - "time_format" => "time_format", - "timezone" => "timezone", + "time_key" => "time_key", + "time_format" => "time_format", + "timezone" => "timezone", "include_tag_key" => nil, "tag_key" => "tag_key", "localtime" => nil, "utc" => nil, } + EXTRACT_PARAMS = { + "time_key" => "time_key", + "time_format" => "time_format", + "timezone" => "timezone", + "tag_key" => "tag_key", + "localtime" => nil, + "utc" => nil, + } + FORMATTER_PARAMS = { "format" => "@type", "delimiter" => "delimiter", "force_quotes" => "force_quotes", # CsvFormatter + "keys" => "keys", # TSVFormatter "fields" => "fields", # CsvFormatter "json_parser" => "json_parser", # JSONFormatter "label_delimiter" => "label_delimiter", # LabeledTSVFormatter @@ -102,6 +114,8 @@ def compat_parameters_convert(conf, *types, **kwargs) compat_parameters_buffer(conf, **kwargs) when :inject compat_parameters_inject(conf) + when :extract + compat_parameters_extract(conf) when :parser compat_parameters_parser(conf) when :formatter @@ -174,6 +188,7 @@ def compat_parameters_inject(conf) hash['time_type'] ||= 'string' end if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + hash['time_key'] ||= 'time' hash['time_type'] = 'unixtime' end if conf.has_key?('localtime') || conf.has_key?('utc') @@ -200,6 +215,37 @@ def compat_parameters_inject(conf) conf end + def compat_parameters_extract(conf) + return unless conf.elements('extract').empty? + return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } + + # TODO: warn obsolete parameters if these are deprecated + hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS) + + if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + hash['time_key'] ||= 'time' + hash['time_type'] = 'unixtime' + end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + hash['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + hash['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way + end + end + + e = Fluent::Config::Element.new('extract', '', hash, []) + conf.elements << e + + conf + end + def compat_parameters_parser(conf) return unless conf.elements('parse').empty? return if PARSER_PARAMS.keys.all?{|k| !conf.has_key?(k) } @@ -217,6 +263,19 @@ def compat_parameters_parser(conf) end hash["types"] = JSON.dump(types) end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + hash['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + hash['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way + end + end e = Fluent::Config::Element.new('parse', '', hash, []) conf.elements << e diff --git a/lib/fluent/plugin_helper/extract.rb b/lib/fluent/plugin_helper/extract.rb index 2fa08568ad..79144c8de1 100644 --- a/lib/fluent/plugin_helper/extract.rb +++ b/lib/fluent/plugin_helper/extract.rb @@ -25,7 +25,8 @@ def extract_tag_from_record(record) return nil unless @_extract_enabled if @_extract_tag_key && record.has_key?(@_extract_tag_key) - return record[@_extract_tag_key].to_s + v = @_extract_keep_tag_key ? record[@_extract_tag_key] : record.delete(@_extract_tag_key) + return v.to_s end nil @@ -35,7 +36,8 @@ def extract_time_from_record(record) return nil unless @_extract_enabled if @_extract_time_key && record.has_key?(@_extract_time_key) - return @_extract_time_parser.call(record[@_extract_time_key]) + v = @_extract_keep_time_key ? record[@_extract_time_key] : record.delete(@_extract_time_key) + return @_extract_time_parser.call(v) end nil @@ -45,7 +47,9 @@ module ExtractParams include Fluent::Configurable config_section :extract, required: false, multi: false, param_name: :extract_config do config_param :tag_key, :string, default: nil + config_param :keep_tag_key, :bool, default: false config_param :time_key, :string, default: nil + config_param :keep_time_key, :bool, default: false # To avoid defining :time_type twice config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float @@ -64,7 +68,9 @@ def initialize super @_extract_enabled = false @_extract_tag_key = nil + @_extract_keep_tag_key = nil @_extract_time_key = nil + @_extract_keep_time_key = nil @_extract_time_parser = nil end @@ -73,15 +79,21 @@ def configure(conf) if @extract_config @_extract_tag_key = @extract_config.tag_key + @_extract_keep_tag_key = @extract_config.keep_tag_key @_extract_time_key = @extract_config.time_key if @_extract_time_key + @_extract_keep_time_key = @extract_config.keep_time_key @_extract_time_parser = case @extract_config.time_type - when :float then ->(v){ Fluent::EventTime.new(v.to_i, ((v.to_f - v.to_i) * 1_000_000_000).to_i) } - when :unixtime then ->(v){ Fluent::EventTime.new(v.to_i, 0) } + when :float then Fluent::NumericTimeParser.new(:float) + when :unixtime then Fluent::NumericTimeParser.new(:unixtime) else localtime = @extract_config.localtime && !@extract_config.utc Fluent::TimeParser.new(@extract_config.time_format, localtime, @extract_config.timezone) end + else + if @extract_config.time_format + log.warn "'time_format' specified without 'time_key', will be ignored" + end end @_extract_enabled = @_extract_tag_key || @_extract_time_key diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index cc1f285983..cec2cff7fc 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -113,6 +113,10 @@ def configure(conf) localtime = @inject_config.localtime && !@inject_config.utc Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone) end + else + if @inject_config.time_format + log.warn "'time_format' specified without 'time_key', will be ignored" + end end @_inject_enabled = @_inject_hostname_key || @_inject_tag_key || @_inject_time_key diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index e81185f71f..6b7d5af237 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -150,11 +150,15 @@ def instance_shutdown def run_actual(timeout: DEFAULT_TIMEOUT, &block) if @instance.respond_to?(:_threads) - sleep 0.01 until @instance._threads.values.all?(&:alive?) + sleep 0.1 until @instance._threads.values.all?(&:alive?) end if @instance.respond_to?(:event_loop_running?) - sleep 0.01 until @instance.event_loop_running? + sleep 0.1 until @instance.event_loop_running? + end + + if @instance.respond_to?(:_child_process_processes) + sleep 0.1 until @instance._child_process_processes.values.all?{|pinfo| pinfo.alive } end return_value = nil diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 6e179193f3..e4631f4102 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -1,133 +1,245 @@ require_relative '../helper' require 'fluent/test/driver/input' require 'fluent/plugin/in_exec' -require 'net/http' +require 'timecop' class ExecInputTest < Test::Unit::TestCase + SCRIPT_PATH = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + TEST_TIME = "2011-01-02 13:14:15" + TEST_UNIX_TIME = Time.parse(TEST_TIME) + def setup Fluent::Test.setup - @test_time = event_time("2011-01-02 13:14:15") - @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + @test_time = event_time() end - def create_driver(conf = tsv_config) + def create_driver(conf) Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end - def tsv_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 0 + DEFAULT_CONFIG_ONLY_WITH_KEYS = %[ + command ruby #{SCRIPT_PATH} "#{TEST_TIME}" 0 + run_interval 1s + tag "my.test.data" + + keys ["k1", "k2", "k3"] + + ] + + TSV_CONFIG = %[ + command ruby #{SCRIPT_PATH} "#{TEST_TIME}" 0 + run_interval 1s + + @type tsv + keys time, tag, k1 + + + tag_key tag + time_key time + time_type string + time_format %Y-%m-%d %H:%M:%S + + ] + + JSON_CONFIG = %[ + command ruby #{SCRIPT_PATH} #{TEST_UNIX_TIME.to_i} 1 + run_interval 1s + + @type json + + + tag_key tag + time_key time + time_type unixtime + + ] + + MSGPACK_CONFIG = %[ + command ruby #{SCRIPT_PATH} #{TEST_UNIX_TIME.to_i} 2 + run_interval 1s + + @type msgpack + + + tag_key tagger + time_key datetime + time_type unixtime + + ] + + # here document for not de-quoting backslashes + REGEXP_CONFIG = %[ + command ruby #{SCRIPT_PATH} "#{TEST_TIME}" 3 + run_interval 1s + tag regex_tag +] + <<'EOC' + + @type regexp + expression "(? +EOC + + sub_test_case 'with configuration with sections' do + test 'configure with default tsv format without extract' do + d = create_driver DEFAULT_CONFIG_ONLY_WITH_KEYS + assert{ d.instance.parser.is_a? Fluent::Plugin::TSVParser } + assert_equal "my.test.data", d.instance.tag + assert_equal ["k1", "k2", "k3"], d.instance.parser.keys + end + + test 'configure raises error if both of tag and extract.tag_key are missing' do + assert_raise Fluent::ConfigError.new("'tag' or 'tag_key' option is required on exec input") do + create_driver %[ + command ruby -e 'puts "yay"' + + keys y1 + + ] + end + end + + test 'configure for tsv' do + d = create_driver TSV_CONFIG + assert{ d.instance.parser.is_a? Fluent::Plugin::TSVParser } + assert_equal ["time", "tag", "k1"], d.instance.parser.keys + assert_equal "tag", d.instance.extract_config.tag_key + assert_equal "time", d.instance.extract_config.time_key + assert_equal :string, d.instance.extract_config.time_type + assert_equal "%Y-%m-%d %H:%M:%S", d.instance.extract_config.time_format + end + + test 'configure for json' do + d = create_driver JSON_CONFIG + assert{ d.instance.parser.is_a? Fluent::Plugin::JSONParser } + assert_equal "tag", d.instance.extract_config.tag_key + assert_equal "time", d.instance.extract_config.time_key + assert_equal :unixtime, d.instance.extract_config.time_type + end + + test 'configure for msgpack' do + d = create_driver MSGPACK_CONFIG + assert{ d.instance.parser.is_a? Fluent::Plugin::MessagePackParser } + assert_equal "tagger", d.instance.extract_config.tag_key + assert_equal "datetime", d.instance.extract_config.time_key + assert_equal :unixtime, d.instance.extract_config.time_type + end + + test 'configure for regexp' do + d = create_driver REGEXP_CONFIG + assert{ d.instance.parser.is_a? Fluent::Plugin::RegexpParser } + assert_equal "regex_tag", d.instance.tag + expression = <<'EXP'.chomp +(?