From 356722a4bd76021f9bf15a8e95c791712f762f3a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 23 Jun 2016 18:56:08 +0900 Subject: [PATCH 01/23] separate and rename code about buffers, to add code about parser/formatter later --- lib/fluent/plugin_helper/compat_parameters.rb | 87 +++++++++++-------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index c223f43bd9..9fd4f9f01b 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -24,7 +24,7 @@ module CompatParameters # This helper is mainly to convert plugins from v0.12 API # to v0.14 API safely, without breaking user deployment. - PARAMS_MAP = { + BUFFER_PARAMS_MAP = { "buffer_type" => "@type", "buffer_path" => "path", "num_threads" => "flush_thread_count", @@ -40,59 +40,70 @@ module CompatParameters "flush_at_shutdown" => "flush_at_shutdown", } - TIME_SLICED_PARAMS = { + BUFFER_TIME_SLICED_PARAMS = { "time_slice_format" => nil, "time_slice_wait" => "timekey_wait", } - def compat_parameters_default_chunk_key + def compat_parameters_buffer_default_chunk_key # '', 'time' or 'tag' raise NotImplementedError, "return one of '', 'time' or 'tag'" end def configure(conf) - if conf.elements('buffer').empty? && PARAMS_MAP.keys.any?{|k| conf.has_key?(k) } || TIME_SLICED_PARAMS.keys.any?{|k| conf.has_key?(k) } - # TODO: warn obsolete parameters if these are deprecated - attr = {} - PARAMS_MAP.each do |compat, current| - next unless current - if conf.has_key?(compat) - if compat == 'buffer_queue_full_action' && conf[compat] == 'exception' - attr[current] = 'throw_exception' - else - attr[current] = conf[compat] - end - end - end - TIME_SLICED_PARAMS.each do |compat, current| - next unless current - attr[current] = conf[compat] if conf.has_key?(compat) - end + case self + when Fluent::Plugin::Output + compat_parameters_buffer(conf) + end + + super + end - chunk_key = nil + def compat_parameters_buffer(conf) + # return immediately if <buffer> section exists, or any buffer-related parameters don't exist + return unless conf.elements('buffer').empty? + return unless (BUFFER_PARAMS_MAP.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) } - if conf.has_key?('time_slice_format') - chunk_key = 'time' - attr['timekey'] = case conf['time_slice_format'] - when /\%S/ then 1 - when /\%M/ then 60 - when /\%H/ then 3600 - when /\%d/ then 86400 - else - raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" - end - else - chunk_key = compat_parameters_default_chunk_key - if chunk_key == 'time' - attr['timekey'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d' + # TODO: warn obsolete parameters if these are deprecated + attr = {} + BUFFER_PARAMS_MAP.each do |compat, current| + next unless current + if conf.has_key?(compat) + if compat == 'buffer_queue_full_action' && conf[compat] == 'exception' + attr[current] = 'throw_exception' + else + attr[current] = conf[compat] end end + end + BUFFER_TIME_SLICED_PARAMS.each do |compat, current| + next unless current + attr[current] = conf[compat] if conf.has_key?(compat) + end + + chunk_key = nil - e = Fluent::Config::Element.new('buffer', chunk_key, attr, []) - conf.elements << e + if conf.has_key?('time_slice_format') + chunk_key = 'time' + attr['timekey'] = case conf['time_slice_format'] + when /\%S/ then 1 + when /\%M/ then 60 + when /\%H/ then 3600 + when /\%d/ then 86400 + else + raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" + end + else + chunk_key = compat_parameters_buffer_default_chunk_key + if chunk_key == 'time' + attr['timekey'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d' + end end - super + e = Fluent::Config::Element.new('buffer', chunk_key, attr, []) + conf.elements << e + + conf end end end From 30b138b5190369c547f8cf40c03a87f277c455bd Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 23 Jun 2016 19:56:35 +0900 Subject: [PATCH 02/23] roughly implement for parser/formatter and shrink code --- lib/fluent/plugin_helper/compat_parameters.rb | 103 +++++++++++++++--- 1 file changed, 90 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 9fd4f9f01b..c1da9a9bab 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -24,7 +24,7 @@ module CompatParameters # This helper is mainly to convert plugins from v0.12 API # to v0.14 API safely, without breaking user deployment. - BUFFER_PARAMS_MAP = { + BUFFER_PARAMS = { "buffer_type" => "@type", "buffer_path" => "path", "num_threads" => "flush_thread_count", @@ -45,6 +45,45 @@ module CompatParameters "time_slice_wait" => "timekey_wait", } + PARSER_PARAMS = { + "format" => "@type", + "time_key" => "time_key", + "time_format" => "time_format", + "delimiter" => "delimiter", + "json_parser" => "json_parser", # JSONParser + "label_delimiter" => "label_delimiter", # LabeledTSVParser + "format_firstline" => "format_firstline", # MultilineParser + "message_key" => "message_key", # NoneParser + "with_priority" => "with_priority", # SyslogParser + } + + FORMATTER_PARAMS = { + "format" => "@type", + + # TODO: convert to use 'inject' + "include_time_key" => "include_time_key", + "include_tag_key" => "include_tag_key", + "time_key" => "time_key", + "time_format" => "time_format", + "time_as_epoch" => "time_as_epoch", + "localtime" => "localtime", + "utc" => "utc", + "timezone" => "timezone", + "tag_key" => "tag_key", + + "delimiter" => "delimiter", + + "force_quotes" => "force_quotes", # CsvFormatter + "fields" => "fields", # CsvFormatter + "json_parser" => "json_parser", # JSONFormatter + "label_delimiter" => "label_delimiter", # LabeledTSVFormatter + "output_time" => "output_time", # OutFileFormatter + "output_tag" => "output_tag", # OutFileFormatter + "message_key" => "message_key", # SingleValueFormatter + "add_newline" => "add_newline", # SingleValueFormatter + "output_type" => "output_type", # StdoutFormatter + } + def compat_parameters_buffer_default_chunk_key # '', 'time' or 'tag' raise NotImplementedError, "return one of '', 'time' or 'tag'" @@ -54,31 +93,43 @@ def configure(conf) case self when Fluent::Plugin::Output compat_parameters_buffer(conf) + when Fluent::Plugin::Parser + compat_parameters_parser(conf) + when Fluent::Plugin::Formatter + compat_parameters_formatter(conf) end super end - def compat_parameters_buffer(conf) - # return immediately if <buffer> section exists, or any buffer-related parameters don't exist - return unless conf.elements('buffer').empty? - return unless (BUFFER_PARAMS_MAP.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) } - - # TODO: warn obsolete parameters if these are deprecated + def compat_parameters_copy_to_subsection_attributes(conf, params, &block) attr = {} - BUFFER_PARAMS_MAP.each do |compat, current| + params.each do |compat, current| next unless current if conf.has_key?(compat) - if compat == 'buffer_queue_full_action' && conf[compat] == 'exception' - attr[current] = 'throw_exception' + if block_given? + attr[current] = block.call(compat, conf[compat]) else attr[current] = conf[compat] end end end - BUFFER_TIME_SLICED_PARAMS.each do |compat, current| - next unless current - attr[current] = conf[compat] if conf.has_key?(compat) + attr + end + + def compat_parameters_buffer(conf) + # return immediately if <buffer> section exists, or any buffer-related parameters don't exist + return unless conf.elements('buffer').empty? + return if (BUFFER_PARAMS.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) } + + # TODO: warn obsolete parameters if these are deprecated + buffer_params = BUFFER_PARAMS.merge(BUFFER_TIME_SLICED_PARAMS) + attr = compat_parameters_copy_to_subsection_attributes(conf, buffer_params) do |compat_key, value| + if compat_key == 'buffer_queue_full_action' && value == 'exception' + 'throw_exception' + else + value + end end chunk_key = nil @@ -105,6 +156,32 @@ def compat_parameters_buffer(conf) conf end + + def compat_parameters_parser(conf) + return unless conf.elements('parse').empty? + return if PARSER_PARAMS.keys.all?{|k| !conf.has_key?(k) } + + # TODO: warn obsolete parameters if these are deprecated + attr = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS) + + e = Fluent::Config::Element.new('parse', '', attr, []) + conf.elements << e + + conf + end + + def compat_parameters_formatter(conf) + return unless conf.elements('format').empty? + return if FORMATTER_PARAMS.keys.all?{|k| !conf.has_key?(k) } + + # TODO: warn obsolete parameters if these are deprecated + attr = compat_parameters_copy_to_subsection_attributes(conf, FORMATTER_PARAMS) + + e = Fluent::Config::Element.new('format', '', attr, []) + conf.elements << e + + conf + end end end end From eb4e3111667e2616be863e67c74a69b0666d7712 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 23 Jun 2016 20:42:54 +0900 Subject: [PATCH 03/23] fix to support "localtime" to use localtime always (including DST) --- lib/fluent/plugin_helper/inject.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index aaf4d02028..bbe0fc88f3 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -68,7 +68,8 @@ module InjectParams config_param :time_key, :string, default: nil config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float config_param :time_format, :string, default: nil - config_param :timezone, :string, default: "#{Time.now.strftime('%z')}" # localtime + config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc + config_param :timezone, :string, default: nil end end @@ -105,7 +106,7 @@ def configure(conf) when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value when :unixtime then ->(time){ time.to_i } else - Fluent::TimeFormatter.new(@inject_config.time_format, false, @inject_config.timezone) + Fluent::TimeFormatter.new(@inject_config.time_format, @inject_config.localtime, @inject_config.timezone) end end From 0b0d73d2c2b22be00c54167174ea74423bdd7110 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 23 Jun 2016 20:43:34 +0900 Subject: [PATCH 04/23] fix to compat parameter helper for inject helper (formatter do NOT manage it in v0.14) --- lib/fluent/plugin_helper/compat_parameters.rb | 60 +++++++++++++++---- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index c1da9a9bab..cb4a23fc1d 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -57,22 +57,19 @@ module CompatParameters "with_priority" => "with_priority", # SyslogParser } - FORMATTER_PARAMS = { - "format" => "@type", - - # TODO: convert to use 'inject' - "include_time_key" => "include_time_key", - "include_tag_key" => "include_tag_key", + INJECT_PARAMS = { "time_key" => "time_key", "time_format" => "time_format", - "time_as_epoch" => "time_as_epoch", - "localtime" => "localtime", - "utc" => "utc", + # "time_as_epoch" => "time_as_epoch", + # "localtime" => "localtime", + # "utc" => "utc", "timezone" => "timezone", "tag_key" => "tag_key", + } + FORMATTER_PARAMS = { + "format" => "@type", "delimiter" => "delimiter", - "force_quotes" => "force_quotes", # CsvFormatter "fields" => "fields", # CsvFormatter "json_parser" => "json_parser", # JSONFormatter @@ -92,10 +89,12 @@ def compat_parameters_buffer_default_chunk_key def configure(conf) case self when Fluent::Plugin::Output + compat_parameters_inject(conf) compat_parameters_buffer(conf) when Fluent::Plugin::Parser compat_parameters_parser(conf) when Fluent::Plugin::Formatter + compat_parameters_inject(conf) compat_parameters_formatter(conf) end @@ -117,6 +116,47 @@ def compat_parameters_copy_to_subsection_attributes(conf, params, &block) attr end + def compat_parameters_inject(conf) + return unless conf.elements('inject').empty? + return if INJECT_PARAMS.keys.all?{|k| !conf.has_key?(k) } + + "time_key" => "time_key", + "time_format" => "time_format", + "timezone" => "timezone", + "tag_key" => "tag_key", + + # TODO: warn obsolete parameters if these are deprecated + attr = compat_parameters_copy_to_subsection_attributes(conf, INJECT_PARAMS) do |compat_key, value| + value + end + + if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key']) + attr['time_key'] ||= 'time' + end + if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + attr['time_type'] = 'unixtime' + end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) + attr['localtime'] = true + elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) + attr['localtime'] = false + attr['timezone'] ||= "+0000" + else + log.warn "both of localtime and utc are specified as false" + end + end + + if conf.has_key?('include_tag_key') && Fluent::Config.bool_value(conf['include_tag_key']) + attr['tag_key'] ||= 'tag' + end + + e = Fluent::Config::Element.new('inject', '', attr, []) + conf.elements << e + + conf + end + def compat_parameters_buffer(conf) # return immediately if <buffer> section exists, or any buffer-related parameters don't exist return unless conf.elements('buffer').empty? From ab9b2b50e0abb1b3dd6c4cc96eda0e28822ef1eb Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 24 Jun 2016 15:25:37 +0900 Subject: [PATCH 05/23] refactor code --- lib/fluent/plugin_helper/compat_parameters.rb | 122 +++++++----------- 1 file changed, 47 insertions(+), 75 deletions(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index cb4a23fc1d..691d785db2 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/config/types' require 'fluent/config/element' module Fluent @@ -60,9 +61,6 @@ module CompatParameters INJECT_PARAMS = { "time_key" => "time_key", "time_format" => "time_format", - # "time_as_epoch" => "time_as_epoch", - # "localtime" => "localtime", - # "utc" => "utc", "timezone" => "timezone", "tag_key" => "tag_key", } @@ -81,50 +79,49 @@ module CompatParameters "output_type" => "output_type", # StdoutFormatter } - def compat_parameters_buffer_default_chunk_key - # '', 'time' or 'tag' - raise NotImplementedError, "return one of '', 'time' or 'tag'" - end + def compat_parameters_buffer(conf, default_chunk_key: '') + # return immediately if <buffer> section exists, or any buffer-related parameters don't exist + return unless conf.elements('buffer').empty? + return if (BUFFER_PARAMS.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) } - def configure(conf) - case self - when Fluent::Plugin::Output - compat_parameters_inject(conf) - compat_parameters_buffer(conf) - when Fluent::Plugin::Parser - compat_parameters_parser(conf) - when Fluent::Plugin::Formatter - compat_parameters_inject(conf) - compat_parameters_formatter(conf) + # TODO: warn obsolete parameters if these are deprecated + buffer_params = BUFFER_PARAMS.merge(BUFFER_TIME_SLICED_PARAMS) + attr = compat_parameters_copy_to_subsection_attributes(conf, buffer_params) do |compat_key, value| + if compat_key == 'buffer_queue_full_action' && value == 'exception' + 'throw_exception' + else + value + end end - super - end + chunk_key = default_chunk_key - def compat_parameters_copy_to_subsection_attributes(conf, params, &block) - attr = {} - params.each do |compat, current| - next unless current - if conf.has_key?(compat) - if block_given? - attr[current] = block.call(compat, conf[compat]) - else - attr[current] = conf[compat] - end + if conf.has_key?('time_slice_format') + chunk_key = 'time' + attr['timekey'] = case conf['time_slice_format'] + when /\%S/ then 1 + when /\%M/ then 60 + when /\%H/ then 3600 + when /\%d/ then 86400 + else + raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" + end + else + if chunk_key == 'time' + attr['timekey'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d' end end - attr + + e = Fluent::Config::Element.new('buffer', chunk_key, attr, []) + conf.elements << e + + conf end def compat_parameters_inject(conf) return unless conf.elements('inject').empty? return if INJECT_PARAMS.keys.all?{|k| !conf.has_key?(k) } - "time_key" => "time_key", - "time_format" => "time_format", - "timezone" => "timezone", - "tag_key" => "tag_key", - # TODO: warn obsolete parameters if these are deprecated attr = compat_parameters_copy_to_subsection_attributes(conf, INJECT_PARAMS) do |compat_key, value| value @@ -157,46 +154,6 @@ def compat_parameters_inject(conf) conf end - def compat_parameters_buffer(conf) - # return immediately if <buffer> section exists, or any buffer-related parameters don't exist - return unless conf.elements('buffer').empty? - return if (BUFFER_PARAMS.keys + BUFFER_TIME_SLICED_PARAMS.keys).all?{|k| !conf.has_key?(k) } - - # TODO: warn obsolete parameters if these are deprecated - buffer_params = BUFFER_PARAMS.merge(BUFFER_TIME_SLICED_PARAMS) - attr = compat_parameters_copy_to_subsection_attributes(conf, buffer_params) do |compat_key, value| - if compat_key == 'buffer_queue_full_action' && value == 'exception' - 'throw_exception' - else - value - end - end - - chunk_key = nil - - if conf.has_key?('time_slice_format') - chunk_key = 'time' - attr['timekey'] = case conf['time_slice_format'] - when /\%S/ then 1 - when /\%M/ then 60 - when /\%H/ then 3600 - when /\%d/ then 86400 - else - raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" - end - else - chunk_key = compat_parameters_buffer_default_chunk_key - if chunk_key == 'time' - attr['timekey'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d' - end - end - - e = Fluent::Config::Element.new('buffer', chunk_key, attr, []) - conf.elements << e - - conf - end - def compat_parameters_parser(conf) return unless conf.elements('parse').empty? return if PARSER_PARAMS.keys.all?{|k| !conf.has_key?(k) } @@ -222,6 +179,21 @@ def compat_parameters_formatter(conf) conf end + + def compat_parameters_copy_to_subsection_attributes(conf, params, &block) + attr = {} + params.each do |compat, current| + next unless current + if conf.has_key?(compat) + if block_given? + attr[current] = block.call(compat, conf[compat]) + else + attr[current] = conf[compat] + end + end + end + attr + end end end end From 0f0da1c620314316a1ab11de112ea1cd008dd94c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 24 Jun 2016 15:26:04 +0900 Subject: [PATCH 06/23] fix const names --- lib/fluent/compat/output.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 817dd29537..b1af9dcbae 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -210,10 +210,10 @@ def support_in_v12_style?(feature) config_param :flush_at_shutdown, :bool, default: true - PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP + BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS def self.propagate_default_params - PARAMS_MAP + BUFFER_PARAMS end include PropagateDefault @@ -225,7 +225,7 @@ def configure(conf) "flush_mode" => "interval", "retry_type" => "exponential_backoff", } - PARAMS_MAP.each do |older, newer| + BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' @@ -399,10 +399,10 @@ def support_in_v12_style?(feature) config_set_default :time_as_integer, true - PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP + BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS def self.propagate_default_params - PARAMS_MAP + BUFFER_PARAMS end include PropagateDefault @@ -414,7 +414,7 @@ def configure(conf) "flush_mode" => "interval", "retry_type" => "exponential_backoff", } - PARAMS_MAP.each do |older, newer| + BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' @@ -513,7 +513,7 @@ def support_in_v12_style?(feature) config_set_default :@type, 'file' end - PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP.merge(Fluent::PluginHelper::CompatParameters::TIME_SLICED_PARAMS) + BUFFER_PARAMS = Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS) def initialize super @@ -525,7 +525,7 @@ def initialize end def self.propagate_default_params - PARAMS_MAP + BUFFER_PARAMS end include PropagateDefault @@ -537,7 +537,7 @@ def configure(conf) "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"), "retry_type" => "exponential_backoff", } - PARAMS_MAP.each do |older, newer| + BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' From 7cbac5e3c19b2c8150fe882e68ce3b68bf37b1e6 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 24 Jun 2016 15:26:24 +0900 Subject: [PATCH 07/23] rename dummy plugins to add other types of plugins --- test/plugin_helper/test_compat_parameters.rb | 44 +++++++++++--------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/test/plugin_helper/test_compat_parameters.rb b/test/plugin_helper/test_compat_parameters.rb index ab176b9ed4..c4cde34785 100644 --- a/test/plugin_helper/test_compat_parameters.rb +++ b/test/plugin_helper/test_compat_parameters.rb @@ -19,44 +19,50 @@ class CompatParameterTest < Test::Unit::TestCase end end - class Dummy0 < Fluent::Plugin::Output + class DummyO0 < Fluent::Plugin::Output helpers :compat_parameters - def compat_parameters_default_chunk_key - '' + def configure(conf) + compat_parameters_buffer(conf, default_chunk_key: '') + super end def write(chunk) # dummy end end - class Dummy1 < Fluent::Plugin::Output + class DummyO1 < Fluent::Plugin::Output helpers :compat_parameters - def compat_parameters_default_chunk_key - 'time' + def configure(conf) + compat_parameters_buffer(conf, default_chunk_key: 'time') + super end def write(chunk) # dummy end end - class Dummy2 < Fluent::Plugin::Output + class DummyO2 < Fluent::Plugin::Output helpers :compat_parameters - # for test to assume default key time by 'time_slice_format' + def configure(conf) + compat_parameters_buffer(conf, default_chunk_key: 'time') + super + end def write(chunk) # dummy end end - class Dummy3 < Fluent::Plugin::Output + class DummyO3 < Fluent::Plugin::Output helpers :compat_parameters - def compat_parameters_default_chunk_key - 'tag' + def configure(conf) + compat_parameters_buffer(conf, default_chunk_key: 'tag') + super end def write(chunk) # dummy end end - sub_test_case 'plugins which does not have default chunk key' do + sub_test_case 'output plugins which does not have default chunk key' do setup do - @p = Dummy0 + @p = DummyO0 end test 'plugin helper converts parameters into plugin configuration parameters' do @@ -82,9 +88,9 @@ def write(chunk) end end - sub_test_case 'plugins which has default chunk key: time' do + sub_test_case 'output plugins which has default chunk key: time' do setup do - @p = Dummy1 + @p = DummyO1 end test 'plugin helper converts parameters into plugin configuration parameters' do @@ -112,9 +118,9 @@ def write(chunk) end end - sub_test_case 'plugins which does not have default chunk key' do + sub_test_case 'output plugins which does not have default chunk key' do setup do - @p = Dummy2 + @p = DummyO2 end test 'plugin helper converts parameters into plugin configuration parameters' do @@ -144,9 +150,9 @@ def write(chunk) end end - sub_test_case 'plugins which has default chunk key: tag' do + sub_test_case 'output plugins which has default chunk key: tag' do setup do - @p = Dummy3 + @p = DummyO3 end test 'plugin helper converts parameters into plugin configuration parameters' do From 55f217e673409012a9dad2ab4b4f753405d027d7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 24 Jun 2016 15:41:37 +0900 Subject: [PATCH 08/23] add utility method to call each compat types at once --- lib/fluent/plugin_helper/compat_parameters.rb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 691d785db2..4b6cb2837b 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -79,6 +79,25 @@ module CompatParameters "output_type" => "output_type", # StdoutFormatter } + def compat_parameters_convert(conf, *types, **kwargs) + types.each do |type| + case type + when :buffer + compat_parameters_buffer(conf, **kwargs) + when :inject + compat_parameters_inject(conf, **kwargs) + when :parser + compat_parameters_parser(conf, **kwargs) + when :formatter + compat_parameters_formatter(conf, **kwargs) + else + raise "BUG: unknown compat_parameters type: #{type}" + end + end + + conf + end + def compat_parameters_buffer(conf, default_chunk_key: '') # return immediately if <buffer> section exists, or any buffer-related parameters don't exist return unless conf.elements('buffer').empty? From d41b107fcd11af7bce026123f8b68f396aebf27c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 24 Jun 2016 15:42:15 +0900 Subject: [PATCH 09/23] refactor test code --- test/plugin_helper/test_compat_parameters.rb | 46 +++++++------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/test/plugin_helper/test_compat_parameters.rb b/test/plugin_helper/test_compat_parameters.rb index c4cde34785..5051422276 100644 --- a/test/plugin_helper/test_compat_parameters.rb +++ b/test/plugin_helper/test_compat_parameters.rb @@ -25,9 +25,7 @@ def configure(conf) compat_parameters_buffer(conf, default_chunk_key: '') super end - def write(chunk) - # dummy - end + def write(chunk); end # dummy end class DummyO1 < Fluent::Plugin::Output helpers :compat_parameters @@ -35,9 +33,7 @@ def configure(conf) compat_parameters_buffer(conf, default_chunk_key: 'time') super end - def write(chunk) - # dummy - end + def write(chunk); end # dummy end class DummyO2 < Fluent::Plugin::Output helpers :compat_parameters @@ -45,9 +41,7 @@ def configure(conf) compat_parameters_buffer(conf, default_chunk_key: 'time') super end - def write(chunk) - # dummy - end + def write(chunk); end # dummy end class DummyO3 < Fluent::Plugin::Output helpers :compat_parameters @@ -55,16 +49,18 @@ def configure(conf) compat_parameters_buffer(conf, default_chunk_key: 'tag') super end - def write(chunk) - # dummy + def write(chunk); end # dummy + end + class DummyO4 < Fluent::Plugin::Output + helpers :compat_parameters + def configure(conf) + compat_parameters_convert(conf, :buffer, :inject, :formatter, default_chunk_key: 'tag') + super end + def write(chunk); end # dummy end sub_test_case 'output plugins which does not have default chunk key' do - setup do - @p = DummyO0 - end - test 'plugin helper converts parameters into plugin configuration parameters' do hash = { 'num_threads' => 8, @@ -74,7 +70,7 @@ def write(chunk) 'flush_at_shutdown' => 'yes', } conf = config_element('ROOT', '', hash) - @i = @p.new + @i = DummyO0.new @i.configure(conf) assert_equal 'memory', @i.buffer_config[:@type] @@ -89,10 +85,6 @@ def write(chunk) end sub_test_case 'output plugins which has default chunk key: time' do - setup do - @p = DummyO1 - end - test 'plugin helper converts parameters into plugin configuration parameters' do hash = { 'buffer_type' => 'file', @@ -102,7 +94,7 @@ def write(chunk) 'buffer_queue_full_action' => 'block', } conf = config_element('ROOT', '', hash) - @i = @p.new + @i = DummyO1.new @i.configure(conf) assert_equal 'file', @i.buffer_config[:@type] @@ -119,10 +111,6 @@ def write(chunk) end sub_test_case 'output plugins which does not have default chunk key' do - setup do - @p = DummyO2 - end - test 'plugin helper converts parameters into plugin configuration parameters' do hash = { 'buffer_type' => 'file', @@ -133,7 +121,7 @@ def write(chunk) 'buffer_queue_full_action' => 'drop_oldest_chunk', } conf = config_element('ROOT', '', hash) - @i = @p.new + @i = DummyO2.new @i.configure(conf) assert_equal 'file', @i.buffer_config[:@type] @@ -151,10 +139,6 @@ def write(chunk) end sub_test_case 'output plugins which has default chunk key: tag' do - setup do - @p = DummyO3 - end - test 'plugin helper converts parameters into plugin configuration parameters' do hash = { 'buffer_type' => 'memory', @@ -164,7 +148,7 @@ def write(chunk) 'queued_chunk_flush_interval' => '0.5', } conf = config_element('ROOT', '', hash) - @i = @p.new + @i = DummyO3.new @i.configure(conf) assert_equal 'memory', @i.buffer_config[:@type] From 8cd6261d1d02b0a6f34175f6d84e332aa58bfd99 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 11:55:08 +0900 Subject: [PATCH 10/23] "fields" is required parameter for CSV parser --- test/plugin/test_formatter_csv.rb | 10 +++++++--- test/test_formatter.rb | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/test/plugin/test_formatter_csv.rb b/test/plugin/test_formatter_csv.rb index 1f781089de..00d1cf1b99 100644 --- a/test/plugin/test_formatter_csv.rb +++ b/test/plugin/test_formatter_csv.rb @@ -8,7 +8,11 @@ def setup @time = event_time end - def create_driver(conf = "") + CONF = %[ + fields a,b,c + ] + + def create_driver(conf = CONF) Fluent::Test::Driver::Formatter.new(Fluent::Plugin::CsvFormatter).configure(conf) end @@ -20,7 +24,7 @@ def test_config_params d = create_driver assert_equal(',', d.instance.delimiter) assert_equal(true, d.instance.force_quotes) - assert_equal([], d.instance.fields) + assert_equal(['a', 'b', 'c'], d.instance.fields) end data( @@ -29,7 +33,7 @@ def test_config_params 'pipe' => ['|', '|']) def test_config_params_with_customized_delimiters(data) expected, target = data - d = create_driver("delimiter" => target) + d = create_driver("delimiter" => target, 'fields' => 'a,b,c') assert_equal expected, d.instance.delimiter end diff --git a/test/test_formatter.rb b/test/test_formatter.rb index e92c1d60b6..09e008cd2d 100644 --- a/test/test_formatter.rb +++ b/test/test_formatter.rb @@ -270,8 +270,9 @@ def test_config_params 'pipe' => ['|', '|']) def test_config_params_with_customized_delimiters(data) expected, target = data - @formatter.configure('delimiter' => target) + @formatter.configure('delimiter' => target, 'fields' => 'a,b,c') assert_equal expected, @formatter.delimiter + assert_equal ['a', 'b', 'c'], @formatter.fields end def test_format From f90581bf5b958fa26c1f4e6ccba76548a47d7bbc Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 11:56:46 +0900 Subject: [PATCH 11/23] add missing require --- lib/fluent/plugin_helper.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 38406aa2e4..0407033010 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -22,6 +22,7 @@ require 'fluent/plugin_helper/storage' require 'fluent/plugin_helper/parser' require 'fluent/plugin_helper/formatter' +require 'fluent/plugin_helper/inject' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' From f89b0c4bac6e9cae3cf21171a0ae4bca9d020f22 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 11:59:24 +0900 Subject: [PATCH 12/23] add default_type to instantiate plugins with configuration and default_type --- lib/fluent/plugin_helper/formatter.rb | 31 ++++++++++++++++----------- lib/fluent/plugin_helper/parser.rb | 17 +++++++++------ lib/fluent/plugin_helper/storage.rb | 17 +++++++++------ test/plugin_helper/test_formatter.rb | 20 +++++++++++++++++ test/plugin_helper/test_parser.rb | 20 +++++++++++++++++ test/plugin_helper/test_storage.rb | 20 +++++++++++++++++ 6 files changed, 101 insertions(+), 24 deletions(-) diff --git a/lib/fluent/plugin_helper/formatter.rb b/lib/fluent/plugin_helper/formatter.rb index faf046f1ab..1133ad42a5 100644 --- a/lib/fluent/plugin_helper/formatter.rb +++ b/lib/fluent/plugin_helper/formatter.rb @@ -22,15 +22,20 @@ module Fluent module PluginHelper module Formatter - def formatter_create(usage: '', type: nil, conf: nil) + def formatter_create(usage: '', type: nil, conf: nil, default_type: nil) formatter = @_formatters[usage] return formatter if formatter - if !type - raise ArgumentError, "BUG: both type and conf are not specified" unless conf - raise Fluent::ConfigError, "@type is required in <format>" unless conf['@type'] - type = conf['@type'] - end + type = if type + type + elsif conf && conf.respond_to?(:[]) + raise Fluent::ConfigError, "@type is required in <format>" unless conf['@type'] + conf['@type'] + elsif default_type + default_type + else + raise ArgumentError, "BUG: both type and conf are not specified" + end formatter = Fluent::Plugin.new_formatter(type, parent: self) config = case conf when Fluent::Config::Element @@ -77,13 +82,15 @@ def initialize def configure(conf) super - @formatter_configs.each do |section| - if @_formatters[section.usage] - raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}" + if @formatter_configs + @formatter_configs.each do |section| + if @_formatters[section.usage] + raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}" + end + formatter = Plugin.new_formatter(section[:@type], parent: self) + formatter.configure(section.corresponding_config_element) + @_formatters[section.usage] = formatter end - formatter = Plugin.new_formatter(section[:@type], parent: self) - formatter.configure(section.corresponding_config_element) - @_formatters[section.usage] = formatter end end diff --git a/lib/fluent/plugin_helper/parser.rb b/lib/fluent/plugin_helper/parser.rb index 233017ae46..d10ac4b065 100644 --- a/lib/fluent/plugin_helper/parser.rb +++ b/lib/fluent/plugin_helper/parser.rb @@ -22,15 +22,20 @@ module Fluent module PluginHelper module Parser - def parser_create(usage: '', type: nil, conf: nil) + def parser_create(usage: '', type: nil, conf: nil, default_type: nil) parser = @_parsers[usage] return parser if parser - if !type - raise ArgumentError, "BUG: both type and conf are not specified" unless conf - raise Fluent::ConfigError, "@type is required in <parse>" unless conf['@type'] - type = conf['@type'] - end + type = if type + type + elsif conf && conf.respond_to?(:[]) + raise Fluent::ConfigError, "@type is required in <parse>" unless conf['@type'] + conf['@type'] + elsif default_type + default_type + else + raise ArgumentError, "BUG: both type and conf are not specified" + end parser = Fluent::Plugin.new_parser(type, parent: self) config = case conf when Fluent::Config::Element diff --git a/lib/fluent/plugin_helper/storage.rb b/lib/fluent/plugin_helper/storage.rb index a6721ab73b..32b8a1f5c7 100644 --- a/lib/fluent/plugin_helper/storage.rb +++ b/lib/fluent/plugin_helper/storage.rb @@ -29,18 +29,23 @@ module Storage StorageState = Struct.new(:storage, :running) - def storage_create(usage: '', type: nil, conf: nil) + def storage_create(usage: '', type: nil, conf: nil, default_type: nil) s = @_storages[usage] if s && s.running return s.storage elsif s # storage is already created, but not loaded / started else # !s - if !type - raise ArgumentError, "BUG: both type and conf are not specified" unless conf - raise Fluent::ConfigError, "@type is not specified for <storage>" unless conf['@type'] - type = conf['@type'] - end + type = if type + type + elsif conf && conf.respond_to?(:[]) + raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type'] + conf['@type'] + elsif default_type + default_type + else + raise ArgumentError, "BUG: both type and conf are not specified" + end storage = Plugin.new_storage(type, parent: self) config = case conf when Fluent::Config::Element diff --git a/test/plugin_helper/test_formatter.rb b/test/plugin_helper/test_formatter.rb index 4a59bbdc98..b1b8a85832 100644 --- a/test/plugin_helper/test_formatter.rb +++ b/test/plugin_helper/test_formatter.rb @@ -57,6 +57,26 @@ class Dummy2 < Fluent::Plugin::TestBase assert_equal 'example2', d.formatter_configs.first[:@type] end + test 'creates instance of type specified by conf, or default_type if @type is missing in conf' do + d = Dummy2.new + d.configure(config_element()) + i = d.formatter_create(conf: config_element('format', '', {'@type' => 'example'}), default_type: 'example2') + assert{ i.is_a?(ExampleFormatter) } + + d = Dummy2.new + d.configure(config_element()) + i = d.formatter_create(conf: nil, default_type: 'example2') + assert{ i.is_a?(Example2Formatter) } + end + + test 'raises config error if config section is specified, but @type is not specified' do + d = Dummy2.new + d.configure(config_element()) + assert_raise Fluent::ConfigError.new("@type is required in <format>") do + d.formatter_create(conf: config_element('format', '', {}), default_type: 'example2') + end + end + test 'can be configured without format sections' do d = Dummy.new assert_nothing_raised do diff --git a/test/plugin_helper/test_parser.rb b/test/plugin_helper/test_parser.rb index ad3a1a9193..0d462bc4d6 100644 --- a/test/plugin_helper/test_parser.rb +++ b/test/plugin_helper/test_parser.rb @@ -68,6 +68,26 @@ class Dummy2 < Fluent::Plugin::TestBase assert_equal 'example2', d.parser_configs.first[:@type] end + test 'creates instance of type specified by conf, or default_type if @type is missing in conf' do + d = Dummy2.new + d.configure(config_element()) + i = d.parser_create(conf: config_element('parse', '', {'@type' => 'example'}), default_type: 'example2') + assert{ i.is_a?(ExampleParser) } + + d = Dummy2.new + d.configure(config_element()) + i = d.parser_create(conf: nil, default_type: 'example2') + assert{ i.is_a?(Example2Parser) } + end + + test 'raises config error if config section is specified, but @type is not specified' do + d = Dummy2.new + d.configure(config_element()) + assert_raise Fluent::ConfigError.new("@type is required in <parse>") do + d.parser_create(conf: config_element('parse', '', {}), default_type: 'example2') + end + end + test 'can be configured without parse sections' do d = Dummy.new assert_nothing_raised do diff --git a/test/plugin_helper/test_storage.rb b/test/plugin_helper/test_storage.rb index 835d6ba22c..ead7fa21ba 100644 --- a/test/plugin_helper/test_storage.rb +++ b/test/plugin_helper/test_storage.rb @@ -114,6 +114,26 @@ class Dummy2 < Fluent::Plugin::TestBase assert_equal '/tmp/yay', d.storage_configs.first.dummy_path end + test 'creates instance of type specified by conf, or default_type if @type is missing in conf' do + d = Dummy2.new + d.configure(config_element()) + i = d.storage_create(conf: config_element('format', '', {'@type' => 'example'}), default_type: 'ex2') + assert{ i.is_a?(Fluent::PluginHelper::Storage::SynchronizeWrapper) && i.instance_eval{ @storage }.is_a?(ExampleStorage) } + + d = Dummy2.new + d.configure(config_element()) + i = d.storage_create(conf: nil, default_type: 'ex2') + assert{ i.is_a?(Fluent::PluginHelper::Storage::SynchronizeWrapper) && i.instance_eval{ @storage }.is_a?(Example2Storage) } + end + + test 'raises config error if config section is specified, but @type is not specified' do + d = Dummy2.new + d.configure(config_element()) + assert_raise Fluent::ConfigError.new("@type is required in <storage>") do + d.storage_create(conf: config_element('storage', '', {}), default_type: 'ex2') + end + end + test 'can be configured without storage sections' do d = Dummy.new assert_nothing_raised do From 7892bdb81866a1c154b545396cb78962b532c018 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 11:59:55 +0900 Subject: [PATCH 13/23] fix test subjects to use correct method name --- test/plugin_helper/test_inject.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/plugin_helper/test_inject.rb b/test/plugin_helper/test_inject.rb index d712b4898f..6eb8fb39a1 100644 --- a/test/plugin_helper/test_inject.rb +++ b/test/plugin_helper/test_inject.rb @@ -93,7 +93,7 @@ def config_inject_section(hash = {}) assert_not_nil @d.instance_eval{ @_inject_time_formatter } end - sub_test_case 'using inject_record' do + sub_test_case 'using inject_values_to_record' do test 'injects hostname automatically detected' do detected_hostname = `hostname`.chomp @d.configure(config_inject_section("hostname_key" => "host")) @@ -222,7 +222,7 @@ def config_inject_section(hash = {}) assert_equal record.merge(injected), @d.inject_values_to_record('tag', time, record) end end - sub_test_case 'using inject_event_stream' do + sub_test_case 'using inject_values_to_event_stream' do local_timezone = Time.now.strftime('%z') time_in_unix = Time.parse("2016-06-21 08:10:11 #{local_timezone}").to_i time_subsecond = 320_101_224 From 1ca1ad9888492ddf68f94c88ea9393fdfd95a3c7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 13:44:20 +0900 Subject: [PATCH 14/23] Separate HandleTagAndTimeMixin/StructuredFormatMixin from formatter/parser by using inject plugin helper in filter/output plugin side with conversion of configuration + automated inject_* method invocation for compat plugins --- lib/fluent/compat/filter.rb | 20 +++++ lib/fluent/compat/formatter.rb | 6 +- lib/fluent/compat/formatter_utils.rb | 82 +++++++++++++++++++ .../compat/handle_tag_and_time_mixin.rb | 60 ++++++++++++++ lib/fluent/compat/output.rb | 79 ++++++++++++++++-- lib/fluent/compat/parser_utils.rb | 40 +++++++++ lib/fluent/compat/structured_format_mixin.rb | 26 ++++++ lib/fluent/plugin/formatter.rb | 48 ----------- lib/fluent/plugin/formatter_csv.rb | 10 +-- lib/fluent/plugin/formatter_hash.rb | 5 +- lib/fluent/plugin/formatter_json.rb | 5 +- lib/fluent/plugin/formatter_ltsv.rb | 2 - lib/fluent/plugin/formatter_msgpack.rb | 5 +- lib/fluent/plugin/formatter_out_file.rb | 24 +++++- lib/fluent/plugin/formatter_stdout.rb | 37 ++++++++- lib/fluent/plugin/out_buffered_stdout.rb | 18 +++- lib/fluent/plugin/out_exec.rb | 2 + lib/fluent/plugin/out_stdout.rb | 19 +++-- lib/fluent/plugin/out_stream.rb | 3 + lib/fluent/plugin_helper/compat_parameters.rb | 11 ++- test/plugin/test_out_buffered_stdout.rb | 20 ++++- test/plugin/test_out_stdout.rb | 17 +++- test/plugin_helper/test_compat_parameters.rb | 77 ++++++++++++++++- 23 files changed, 519 insertions(+), 97 deletions(-) create mode 100644 lib/fluent/compat/formatter_utils.rb create mode 100644 lib/fluent/compat/handle_tag_and_time_mixin.rb create mode 100644 lib/fluent/compat/parser_utils.rb create mode 100644 lib/fluent/compat/structured_format_mixin.rb diff --git a/lib/fluent/compat/filter.rb b/lib/fluent/compat/filter.rb index 84785bc9f8..ab542607e6 100644 --- a/lib/fluent/compat/filter.rb +++ b/lib/fluent/compat/filter.rb @@ -17,12 +17,16 @@ require 'fluent/plugin' require 'fluent/plugin/filter' require 'fluent/compat/call_super_mixin' +require 'fluent/compat/formatter_utils' +require 'fluent/compat/parser_utils' module Fluent module Compat class Filter < Fluent::Plugin::Filter # TODO: warn when deprecated + helpers :inject + def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) @@ -30,10 +34,26 @@ def initialize end end + def configure(conf) + ParserUtils.convert_parser_conf(conf) + FormatterUtils.convert_formatter_conf(conf) + + super + end + # These definitions are to get instance methods of superclass of 3rd party plugins # to make it sure to call super def start super + + if instance_variable_defined?(:@formatter) && @inject_config + unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) + if @formatter.respond_to?(:owner) && !@formatter.owner + @formatter.owner = self + @formatter.singleton_class.prepend FormatterUtils::InjectMixin + end + end + end end def before_shutdown diff --git a/lib/fluent/compat/formatter.rb b/lib/fluent/compat/formatter.rb index c8f105bcb9..bcdda7e4a5 100644 --- a/lib/fluent/compat/formatter.rb +++ b/lib/fluent/compat/formatter.rb @@ -16,6 +16,8 @@ require 'fluent/plugin' require 'fluent/plugin/formatter' +require 'fluent/compat/handle_tag_and_time_mixin' +require 'fluent/compat/structured_format_mixin' require 'fluent/plugin/formatter_out_file' require 'fluent/plugin/formatter_stdout' @@ -66,8 +68,8 @@ def self.create(conf) formatter end - HandleTagAndTimeMixin = Fluent::Plugin::Formatter::HandleTagAndTimeMixin - StructuredFormatMixin = Fluent::Plugin::Formatter::StructuredFormatMixin + HandleTagAndTimeMixin = Fluent::Compat::HandleTagAndTimeMixin + StructuredFormatMixin = Fluent::Compat::StructuredFormatMixin class ProcWrappedFormatter < Fluent::Plugin::ProcWrappedFormatter # TODO: warn when deprecated diff --git a/lib/fluent/compat/formatter_utils.rb b/lib/fluent/compat/formatter_utils.rb new file mode 100644 index 0000000000..d5201103e2 --- /dev/null +++ b/lib/fluent/compat/formatter_utils.rb @@ -0,0 +1,82 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/compat_parameters' + +module Fluent + module Compat + module FormatterUtils + INJECT_PARAMS = Fluent::PluginHelper::CompatParameters::INJECT_PARAMS + FORMATTER_PARAMS = Fluent::PluginHelper::CompatParameters::FORMATTER_PARAMS + + module InjectMixin + def format(tag, time, record) + r = owner.inject_values_to_record(tag, time, record) + super(tag, time, r) + end + end + + def self.convert_formatter_conf(conf) + return if conf.elements(name: 'inject').first || conf.elements(name: 'format').first + + inject_params = {} + INJECT_PARAMS.each do |older, newer| + next unless newer + if conf.has_key?(older) + inject_params[newer] = conf[older] + end + end + + if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key']) + inject_params['time_key'] ||= 'time' + inject_params['time_type'] ||= 'string' + end + if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + inject_params['time_type'] = 'unixtime' + end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) + inject_params['localtime'] = true + elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) + inject_params['localtime'] = false + inject_params['timezone'] ||= "+0000" + else + log.warn "both of localtime and utc are specified as false" + end + end + + if conf.has_key?('include_tag_key') && Fluent::Config.bool_value(conf['include_tag_key']) + inject_params['tag_key'] ||= 'tag' + end + + unless inject_params.empty? + conf.elements << Fluent::Config::Element.new('inject', '', inject_params, []) + end + + formatter_params = {} + FORMATTER_PARAMS.each do |older, newer| + next unless newer + if conf.has_key?(older) + formatter_params[newer] = conf[older] + end + end + unless formatter_params.empty? + conf.elements << Fluent::Config::Element.new('format', '', formatter_params, []) + end + end + end + end +end diff --git a/lib/fluent/compat/handle_tag_and_time_mixin.rb b/lib/fluent/compat/handle_tag_and_time_mixin.rb new file mode 100644 index 0000000000..517ddca6a9 --- /dev/null +++ b/lib/fluent/compat/handle_tag_and_time_mixin.rb @@ -0,0 +1,60 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Compat + module HandleTagAndTimeMixin + def self.included(klass) + klass.instance_eval { + config_param :include_time_key, :bool, default: false + config_param :time_key, :string, default: 'time' + config_param :time_format, :string, default: nil + config_param :time_as_epoch, :bool, default: false + config_param :include_tag_key, :bool, default: false + config_param :tag_key, :string, default: 'tag' + config_param :localtime, :bool, default: true + config_param :timezone, :string, default: nil + } + end + + def configure(conf) + super + + if conf['utc'] + @localtime = false + end + @timef = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) + if @time_as_epoch && !@include_time_key + log.warn "time_as_epoch will be ignored because include_time_key is false" + end + end + + def filter_record(tag, time, record) + if @include_tag_key + record[@tag_key] = tag + end + if @include_time_key + if @time_as_epoch + record[@time_key] = time.to_i + else + record[@time_key] = @timef.format(time) + end + end + end + end + end +end + diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index b1af9dcbae..d634da69b1 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -18,6 +18,8 @@ require 'fluent/plugin/output' require 'fluent/plugin/bare_output' require 'fluent/compat/call_super_mixin' +require 'fluent/compat/formatter_utils' +require 'fluent/compat/parser_utils' require 'fluent/compat/propagate_default' require 'fluent/compat/output_chain' require 'fluent/timezone' @@ -136,7 +138,7 @@ def write(chunk) class Output < Fluent::Plugin::Output # TODO: warn when deprecated - helpers :event_emitter + helpers :event_emitter, :inject def support_in_v12_style?(feature) case feature @@ -157,6 +159,26 @@ def initialize self.class.prepend Fluent::Compat::CallSuperMixin end end + + def configure(conf) + ParserUtils.convert_parser_conf(conf) + FormatterUtils.convert_formatter_conf(conf) + + super + end + + def start + super + + if instance_variable_defined?(:@formatter) && @inject_config + unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) + if @formatter.respond_to?(:owner) && !@formatter.owner + @formatter.owner = self + @formatter.singleton_class.prepend FormatterUtils::InjectMixin + end + end + end + end end class MultiOutput < Fluent::Plugin::BareOutput @@ -172,7 +194,7 @@ def process(tag, es) class BufferedOutput < Fluent::Plugin::Output # TODO: warn when deprecated - helpers :event_emitter + helpers :event_emitter, :inject def support_in_v12_style?(feature) case feature @@ -246,6 +268,9 @@ def configure(conf) # RecordFilter mixin uses its own #format_stream method implementation @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter + ParserUtils.convert_parser_conf(conf) + FormatterUtils.convert_formatter_conf(conf) + super if config_style == :v1 @@ -350,12 +375,25 @@ def initialize self.class.prepend Fluent::Compat::CallSuperMixin end end + + def start + super + + if instance_variable_defined?(:@formatter) && @inject_config + unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) + if @formatter.respond_to?(:owner) && !@formatter.owner + @formatter.owner = self + @formatter.singleton_class.prepend FormatterUtils::InjectMixin + end + end + end + end end class ObjectBufferedOutput < Fluent::Plugin::Output # TODO: warn when deprecated - helpers :event_emitter + helpers :event_emitter, :inject # This plugin cannot inherit BufferedOutput because #configure sets chunk_key 'tag' # to flush chunks per tags, but BufferedOutput#configure doesn't allow setting chunk_key @@ -428,6 +466,9 @@ def configure(conf) conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, []) end + ParserUtils.convert_parser_conf(conf) + FormatterUtils.convert_formatter_conf(conf) + super if config_style == :v1 @@ -457,12 +498,25 @@ def initialize self.class.prepend Fluent::Compat::CallSuperMixin end end + + def start + super + + if instance_variable_defined?(:@formatter) && @inject_config + unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) + if @formatter.respond_to?(:owner) && !@formatter.owner + @formatter.owner = self + @formatter.singleton_class.prepend FormatterUtils::InjectMixin + end + end + end + end end class TimeSlicedOutput < Fluent::Plugin::Output # TODO: warn when deprecated - helpers :event_emitter + helpers :event_emitter, :inject def support_in_v12_style?(feature) case feature @@ -576,6 +630,9 @@ def configure(conf) conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, []) end + ParserUtils.convert_parser_conf(conf) + FormatterUtils.convert_formatter_conf(conf) + super if config_style == :v1 @@ -587,6 +644,19 @@ def configure(conf) self.extend TimeSliceChunkMixin end + def start + super + + if instance_variable_defined?(:@formatter) && @inject_config + unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) + if @formatter.respond_to?(:owner) && !@formatter.owner + @formatter.owner = self + @formatter.singleton_class.prepend FormatterUtils::InjectMixin + end + end + end + end + # Original TimeSlicedOutput#emit doesn't call #format_stream # #format MUST be implemented in plugin @@ -598,4 +668,3 @@ def extract_placeholders(str, metadata) end end end - diff --git a/lib/fluent/compat/parser_utils.rb b/lib/fluent/compat/parser_utils.rb new file mode 100644 index 0000000000..a43f95779a --- /dev/null +++ b/lib/fluent/compat/parser_utils.rb @@ -0,0 +1,40 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/compat_parameters' + +module Fluent + module Compat + module ParserUtils + PARSER_PARAMS = Fluent::PluginHelper::CompatParameters::PARSER_PARAMS + + def self.convert_parser_conf(conf) + return if conf.elements(name: 'parse').first + + parser_params = {} + PARSER_PARAMS.each do |older, newer| + next unless newer + if conf.has_key?(older) + parser_params[newer] = conf[older] + end + end + unless parser_params.empty? + conf.elements << Fluent::Config::Element.new('parse', '', parser_params, []) + end + end + end + end +end diff --git a/lib/fluent/compat/structured_format_mixin.rb b/lib/fluent/compat/structured_format_mixin.rb new file mode 100644 index 0000000000..e327fa998c --- /dev/null +++ b/lib/fluent/compat/structured_format_mixin.rb @@ -0,0 +1,26 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Compat + module StructuredFormatMixin + def format(tag, time, record) + filter_record(tag, time, record) + format_record(record) + end + end + end +end diff --git a/lib/fluent/plugin/formatter.rb b/lib/fluent/plugin/formatter.rb index a18c7cd634..8693afbb27 100644 --- a/lib/fluent/plugin/formatter.rb +++ b/lib/fluent/plugin/formatter.rb @@ -29,54 +29,6 @@ class Formatter < Base def format(tag, time, record) raise NotImplementedError, "Implement this method in child class" end - - # Mixins for formatter plugins - module HandleTagAndTimeMixin - def self.included(klass) - klass.instance_eval { - config_param :include_time_key, :bool, default: false - config_param :time_key, :string, default: 'time' - config_param :time_format, :string, default: nil - config_param :time_as_epoch, :bool, default: false - config_param :include_tag_key, :bool, default: false - config_param :tag_key, :string, default: 'tag' - config_param :localtime, :bool, default: true - config_param :timezone, :string, default: nil - } - end - - def configure(conf) - super - - if conf['utc'] - @localtime = false - end - @timef = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) - if @time_as_epoch && !@include_time_key - log.warn "time_as_epoch will be ignored because include_time_key is false" - end - end - - def filter_record(tag, time, record) - if @include_tag_key - record[@tag_key] = tag - end - if @include_time_key - if @time_as_epoch - record[@time_key] = time.to_i - else - record[@time_key] = @timef.format(time) - end - end - end - end - - module StructuredFormatMixin - def format(tag, time, record) - filter_record(tag, time, record) - format_record(record) - end - end end class ProcWrappedFormatter < Formatter diff --git a/lib/fluent/plugin/formatter_csv.rb b/lib/fluent/plugin/formatter_csv.rb index e0093f7008..d002d298d8 100644 --- a/lib/fluent/plugin/formatter_csv.rb +++ b/lib/fluent/plugin/formatter_csv.rb @@ -22,20 +22,20 @@ module Plugin class CsvFormatter < Formatter Plugin.register_formatter('csv', self) - include HandleTagAndTimeMixin - config_param :delimiter, default: ',' do |val| ['\t', 'TAB'].include?(val) ? "\t" : val end config_param :force_quotes, :bool, default: true # "array" looks good for type of :fields, but this implementation removes tailing comma # TODO: Is it needed to support tailing comma? - config_param :fields, default: [] do |val| - val.split(',').map{|f| f.strip!; f.empty? ? nil : f }.compact + config_param :fields, :array, value_type: :string + + def configure(conf) + super + @fields = fields.select{|f| !f.empty? } end def format(tag, time, record) - filter_record(tag, time, record) row = @fields.map do |key| record[key] end diff --git a/lib/fluent/plugin/formatter_hash.rb b/lib/fluent/plugin/formatter_hash.rb index 91c2791939..dd0182ce22 100644 --- a/lib/fluent/plugin/formatter_hash.rb +++ b/lib/fluent/plugin/formatter_hash.rb @@ -21,10 +21,7 @@ module Plugin class HashFormatter < Formatter Plugin.register_formatter('hash', self) - include HandleTagAndTimeMixin - include StructuredFormatMixin - - def format_record(record) + def format(tag, time, record) "#{record.to_s}\n" end end diff --git a/lib/fluent/plugin/formatter_json.rb b/lib/fluent/plugin/formatter_json.rb index 21bfb161b4..7c7a344bde 100644 --- a/lib/fluent/plugin/formatter_json.rb +++ b/lib/fluent/plugin/formatter_json.rb @@ -21,9 +21,6 @@ module Plugin class JSONFormatter < Formatter Plugin.register_formatter('json', self) - include HandleTagAndTimeMixin - include StructuredFormatMixin - config_param :json_parser, :string, default: 'oj' def configure(conf) @@ -39,7 +36,7 @@ def configure(conf) end end - def format_record(record) + def format(tag, time, record) "#{@dump_proc.call(record)}\n" end end diff --git a/lib/fluent/plugin/formatter_ltsv.rb b/lib/fluent/plugin/formatter_ltsv.rb index 1618c71a72..801675d4a9 100644 --- a/lib/fluent/plugin/formatter_ltsv.rb +++ b/lib/fluent/plugin/formatter_ltsv.rb @@ -22,14 +22,12 @@ class LabeledTSVFormatter < Formatter Plugin.register_formatter('ltsv', self) # http://ltsv.org/ - include HandleTagAndTimeMixin config_param :delimiter, :string, default: "\t" config_param :label_delimiter, :string, default: ":" # TODO: escaping for \t in values def format(tag, time, record) - filter_record(tag, time, record) formatted = "" record.each do |label, value| formatted << @delimiter if formatted.length.nonzero? diff --git a/lib/fluent/plugin/formatter_msgpack.rb b/lib/fluent/plugin/formatter_msgpack.rb index 19caa92c8b..a817a7da1f 100644 --- a/lib/fluent/plugin/formatter_msgpack.rb +++ b/lib/fluent/plugin/formatter_msgpack.rb @@ -21,10 +21,7 @@ module Plugin class MessagePackFormatter < Formatter Plugin.register_formatter('msgpack', self) - include HandleTagAndTimeMixin - include StructuredFormatMixin - - def format_record(record) + def format(tag, time, record) record.to_msgpack end end diff --git a/lib/fluent/plugin/formatter_out_file.rb b/lib/fluent/plugin/formatter_out_file.rb index f5208e6739..4e98506907 100644 --- a/lib/fluent/plugin/formatter_out_file.rb +++ b/lib/fluent/plugin/formatter_out_file.rb @@ -15,14 +15,14 @@ # require 'fluent/plugin/formatter' +require 'fluent/time' +require 'yajl' module Fluent module Plugin class OutFileFormatter < Formatter Plugin.register_formatter('out_file', self) - include HandleTagAndTimeMixin - config_param :output_time, :bool, default: true config_param :output_tag, :bool, default: true config_param :delimiter, default: "\t" do |val| @@ -32,9 +32,27 @@ class OutFileFormatter < Formatter else "\t" end end + config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :string + config_param :time_format, :string, default: nil + config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc + config_param :timezone, :string, default: nil + + def configure(conf) + if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) + conf['localtime'] = 'false' + conf['timezone'] = "+0000" + end + + super + @timef = case @time_type + when :float then ->(time){ time.to_r.to_f } + when :unixtime then ->(time){ time.to_i } + else + Fluent::TimeFormatter.new(@time_format, @localtime, @timezone) + end + end def format(tag, time, record) - filter_record(tag, time, record) header = '' header << "#{@timef.format(time)}#{@delimiter}" if @output_time header << "#{tag}#{@delimiter}" if @output_tag diff --git a/lib/fluent/plugin/formatter_stdout.rb b/lib/fluent/plugin/formatter_stdout.rb index c338d9bdb2..272a4e22a0 100644 --- a/lib/fluent/plugin/formatter_stdout.rb +++ b/lib/fluent/plugin/formatter_stdout.rb @@ -26,14 +26,49 @@ class StdoutFormatter < Formatter def configure(conf) super - @sub_formatter = Plugin.new_formatter(@output_type) + @sub_formatter = Plugin.new_formatter(@output_type, parent: self.owner) @sub_formatter.configure(conf) end + def start + super + @sub_formatter.start + end + def format(tag, time, record) header = "#{Time.now.localtime} #{tag}: " "#{header}#{@sub_formatter.format(tag, time, record)}" end + + def stop + @sub_formatter.stop + super + end + + def before_shutdown + @sub_formatter.before_shutdown + super + end + + def shutdown + @sub_formatter.shutdown + super + end + + def after_shutdown + @sub_formatter.after_shutdown + super + end + + def close + @sub_formatter.close + super + end + + def terminate + @sub_formatter.terminate + super + end end end end diff --git a/lib/fluent/plugin/out_buffered_stdout.rb b/lib/fluent/plugin/out_buffered_stdout.rb index ecbd167117..1630b15024 100644 --- a/lib/fluent/plugin/out_buffered_stdout.rb +++ b/lib/fluent/plugin/out_buffered_stdout.rb @@ -20,14 +20,20 @@ module Fluent::Plugin class BufferedStdoutOutput < Output Fluent::Plugin.register_output('buffered_stdout', self) - desc 'Output format.(json,hash)' - config_param :output_type, default: 'json' + helpers :formatter, :inject, :compat_parameters + config_section :buffer do config_set_default :chunk_keys, ['tag'] config_set_default :flush_at_shutdown, true config_set_default :chunk_limit_size, 10 * 1024 end + DEFAULT_FORMAT_TYPE = 'json' + + config_section :format do + config_set_default :@type, DEFAULT_FORMAT_TYPE + end + attr_accessor :delayed def initialize @@ -40,9 +46,12 @@ def prefer_delayed_commit end def configure(conf) + if conf['output_type'] && !conf['format'] + conf['format'] = conf['output_type'] + end + compat_parameters_convert(conf, :inject, :formatter) super - @formatter = Fluent::Plugin.new_formatter(@output_type, parent: self) - @formatter.configure(conf) + @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) end def write(chunk) @@ -54,6 +63,7 @@ def try_write(chunk) end def format(tag, time, record) + record = inject_values_to_record(tag, time, record) "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" end end diff --git a/lib/fluent/plugin/out_exec.rb b/lib/fluent/plugin/out_exec.rb index 95dc541168..ef7e4be62f 100644 --- a/lib/fluent/plugin/out_exec.rb +++ b/lib/fluent/plugin/out_exec.rb @@ -53,6 +53,8 @@ def compat_parameters_default_chunk_key end def configure(conf) + compat_parameters_convert(conf, :buffer, default_chunk_key: 'time') + super @formatter = case @format diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb index b45cedaa39..8d187fd83e 100644 --- a/lib/fluent/plugin/out_stdout.rb +++ b/lib/fluent/plugin/out_stdout.rb @@ -20,18 +20,27 @@ module Fluent::Plugin class StdoutOutput < Output Fluent::Plugin.register_output('stdout', self) - desc 'Output format.(json,hash)' - config_param :output_type, default: 'json' + helpers :inject, :formatter, :compat_parameters + + DEFAULT_FORMAT_TYPE = 'json' + + config_section :format do + config_set_default :@type, DEFAULT_FORMAT_TYPE + end def configure(conf) + if conf['output_type'] && !conf['format'] + conf['format'] = conf['output_type'] + end + compat_parameters_convert(conf, :inject, :formatter) super - @formatter = Fluent::Plugin.new_formatter(@output_type, parent: self) - @formatter.configure(conf) + @formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE) end def process(tag, es) es.each {|time,record| - $log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" + r = inject_values_to_record(tag, time, record) + $log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, r).chomp}\n" } $log.flush end diff --git a/lib/fluent/plugin/out_stream.rb b/lib/fluent/plugin/out_stream.rb index 4be9a4097f..3cc4fbfa72 100644 --- a/lib/fluent/plugin/out_stream.rb +++ b/lib/fluent/plugin/out_stream.rb @@ -25,7 +25,10 @@ module Fluent class StreamOutput < BufferedOutput config_param :send_timeout, :time, default: 60 + helpers :compat_parameters + def configure(conf) + compat_parameters_convert(conf, :buffer) super end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 4b6cb2837b..7a7d395506 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -59,10 +59,14 @@ module CompatParameters } INJECT_PARAMS = { + "include_time_key" => nil, "time_key" => "time_key", "time_format" => "time_format", "timezone" => "timezone", + "include_tag_key" => nil, "tag_key" => "tag_key", + "localtime" => nil, + "utc" => nil, } FORMATTER_PARAMS = { @@ -85,11 +89,11 @@ def compat_parameters_convert(conf, *types, **kwargs) when :buffer compat_parameters_buffer(conf, **kwargs) when :inject - compat_parameters_inject(conf, **kwargs) + compat_parameters_inject(conf) when :parser - compat_parameters_parser(conf, **kwargs) + compat_parameters_parser(conf) when :formatter - compat_parameters_formatter(conf, **kwargs) + compat_parameters_formatter(conf) else raise "BUG: unknown compat_parameters type: #{type}" end @@ -148,6 +152,7 @@ def compat_parameters_inject(conf) if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key']) attr['time_key'] ||= 'time' + attr['time_type'] ||= 'string' end if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) attr['time_type'] = 'unixtime' diff --git a/test/plugin/test_out_buffered_stdout.rb b/test/plugin/test_out_buffered_stdout.rb index 616de36ba3..6f36a4643e 100644 --- a/test/plugin/test_out_buffered_stdout.rb +++ b/test/plugin/test_out_buffered_stdout.rb @@ -16,7 +16,7 @@ def create_driver(conf = CONFIG) test 'default configure' do d = create_driver - assert_equal 'json', d.instance.output_type + assert_equal [], d.instance.formatter_configs assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size assert d.instance.buffer_config.flush_at_shutdown assert_equal ['tag'], d.instance.buffer_config.chunk_keys @@ -27,16 +27,30 @@ def create_driver(conf = CONFIG) test 'configure with output_type' do d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.output_type + assert_equal 'json', d.instance.formatter_configs.first[:@type] d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.output_type + assert_equal 'hash', d.instance.formatter_configs.first[:@type] assert_raise(Fluent::ConfigError) do d = create_driver(CONFIG + "\noutput_type foo") end end + sub_test_case "emit with default config" do + test '#write(synchronous)' do + d = create_driver + time = event_time() + + out = capture_log do + d.run(default_tag: 'test', flush: true) do + d.feed(time, {'test' => 'test'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out + end + end + sub_test_case "emit json" do data('oj' => 'oj', 'yajl' => 'yajl') test '#write(synchronous)' do |data| diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index c3d0a87bf2..4b4c20e4d5 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -16,21 +16,32 @@ def create_driver(conf = CONFIG) def test_configure d = create_driver - assert_equal 'json', d.instance.output_type + assert_equal [], d.instance.formatter_configs end def test_configure_output_type d = create_driver(CONFIG + "\noutput_type json") - assert_equal 'json', d.instance.output_type + assert_equal 'json', d.instance.formatter_configs.first[:@type] d = create_driver(CONFIG + "\noutput_type hash") - assert_equal 'hash', d.instance.output_type + assert_equal 'hash', d.instance.formatter_configs.first[:@type] assert_raise(Fluent::ConfigError) do d = create_driver(CONFIG + "\noutput_type foo") end end + def test_emit_in_default + d = create_driver + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test1'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test1\"}\n", out + end + data('oj' => 'oj', 'yajl' => 'yajl') def test_emit_json(data) d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") diff --git a/test/plugin_helper/test_compat_parameters.rb b/test/plugin_helper/test_compat_parameters.rb index 5051422276..9fcb9f7122 100644 --- a/test/plugin_helper/test_compat_parameters.rb +++ b/test/plugin_helper/test_compat_parameters.rb @@ -2,6 +2,8 @@ require 'fluent/plugin_helper/compat_parameters' require 'fluent/plugin/base' +require 'time' + class CompatParameterTest < Test::Unit::TestCase setup do Fluent::Test.setup @@ -52,11 +54,16 @@ def configure(conf) def write(chunk); end # dummy end class DummyO4 < Fluent::Plugin::Output - helpers :compat_parameters + helpers :compat_parameters, :inject, :formatter + attr_reader :f def configure(conf) compat_parameters_convert(conf, :buffer, :inject, :formatter, default_chunk_key: 'tag') super end + def start + super + @f = formatter_create() + end def write(chunk); end # dummy end @@ -162,4 +169,72 @@ def write(chunk); end # dummy assert_equal [], @i.chunk_keys end end + + sub_test_case 'output plugins which has default chunk key: tag, and enables inject and formatter' do + test 'plugin helper converts parameters into plugin configuration parameters for all of buffer, inject and formatter' do + hash = { + 'buffer_type' => 'file', + 'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__), + 'num_threads' => '10', + 'format' => 'ltsv', + 'delimiter' => ',', + 'label_delimiter' => '%', + 'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime + 'include_tag_key' => 'yes', # default tag_key 'tag' + } + conf = config_element('ROOT', '', hash) + @i = DummyO4.new + @i.configure(conf) + @i.start + + assert_equal 'file', @i.buffer_config[:@type] + assert_equal 10, @i.buffer_config.flush_thread_count + formatter = @i.f + assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter } + assert_equal ',', @i.f.delimiter + assert_equal '%', @i.f.label_delimiter + + assert !@i.chunk_key_time + assert @i.chunk_key_tag + assert_equal [], @i.chunk_keys + + t = event_time('2016-06-24 16:05:01') # localtime + iso8601str = Time.at(t.to_i).iso8601 + formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1})) + assert_equal "value%1,tag%tag.test,time%#{iso8601str}\n", formatted + end + + test 'plugin helper setups time injecting as unix time (integer from epoch)' do + hash = { + 'buffer_type' => 'file', + 'buffer_path' => File.expand_path('../../tmp/compat_parameters/mybuffer.*.log', __FILE__), + 'num_threads' => '10', + 'format' => 'ltsv', + 'delimiter' => ',', + 'label_delimiter' => '%', + 'include_time_key' => 'true', # default time_key 'time' and default time format (iso8601: 2016-06-24T15:57:38) at localtime + 'include_tag_key' => 'yes', # default tag_key 'tag' + } + conf = config_element('ROOT', '', hash) + @i = DummyO4.new + @i.configure(conf) + @i.start + + assert_equal 'file', @i.buffer_config[:@type] + assert_equal 10, @i.buffer_config.flush_thread_count + formatter = @i.f + assert{ formatter.is_a? Fluent::Plugin::LabeledTSVFormatter } + assert_equal ',', @i.f.delimiter + assert_equal '%', @i.f.label_delimiter + + assert !@i.chunk_key_time + assert @i.chunk_key_tag + assert_equal [], @i.chunk_keys + + t = event_time('2016-06-24 16:05:01') # localtime + iso8601str = Time.at(t.to_i).iso8601 + formatted = @i.f.format('tag.test', t, @i.inject_values_to_record('tag.test', t, {"value" => 1})) + assert_equal "value%1,tag%tag.test,time%#{iso8601str}\n", formatted + end + end end From 1f82d9fad20434cc433c7d15d17b5f0cee9af109 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 14:31:02 +0900 Subject: [PATCH 15/23] fix not to set timezone to use UTC (instead of +0000) implicitly --- lib/fluent/compat/formatter_utils.rb | 5 ++++- lib/fluent/plugin_helper/compat_parameters.rb | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/fluent/compat/formatter_utils.rb b/lib/fluent/compat/formatter_utils.rb index d5201103e2..588d39b5f1 100644 --- a/lib/fluent/compat/formatter_utils.rb +++ b/lib/fluent/compat/formatter_utils.rb @@ -52,7 +52,10 @@ def self.convert_formatter_conf(conf) inject_params['localtime'] = true elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) inject_params['localtime'] = false - inject_params['timezone'] ||= "+0000" + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way else log.warn "both of localtime and utc are specified as false" end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 7a7d395506..a608067359 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -162,7 +162,10 @@ def compat_parameters_inject(conf) attr['localtime'] = true elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) attr['localtime'] = false - attr['timezone'] ||= "+0000" + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way else log.warn "both of localtime and utc are specified as false" end From 65acf7337380d22ff46f32c29a0749dc59ddbf53 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 14:32:07 +0900 Subject: [PATCH 16/23] add configuration code to handle timezone: formatter_out_file is an unique formatter to handle it by itself --- lib/fluent/plugin/formatter_out_file.rb | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/formatter_out_file.rb b/lib/fluent/plugin/formatter_out_file.rb index 4e98506907..1e2173bde7 100644 --- a/lib/fluent/plugin/formatter_out_file.rb +++ b/lib/fluent/plugin/formatter_out_file.rb @@ -38,12 +38,27 @@ class OutFileFormatter < Formatter config_param :timezone, :string, default: nil def configure(conf) + # TODO: make a utility method in TimeFormatter to handle these conversion + # copies of this code: plugin_helper/compat_parameters, compat/formatter_utils and here if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch']) - conf['localtime'] = 'false' - conf['timezone'] = "+0000" + conf['time_type'] = 'unixtime' + end + if conf.has_key?('localtime') || conf.has_key?('utc') + if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) + conf['localtime'] = true + elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) + conf['localtime'] = false + # Specifying "localtime false" means using UTC in TimeFormatter + # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). + # There are difference between "Z" and "+0000" in timezone formatting. + # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way + else + log.warn "both of localtime and utc are specified as false" + end end super + @timef = case @time_type when :float then ->(time){ time.to_r.to_f } when :unixtime then ->(time){ time.to_i } From 36d5f25ef72275cc9812753c05217049d716fcb9 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 15:22:56 +0900 Subject: [PATCH 17/23] remove tests about include_tag_key and include_time_key: these were removed from formatter --- test/plugin/test_formatter_csv.rb | 22 ------ test/plugin/test_formatter_json.rb | 30 ------- test/plugin/test_formatter_ltsv.rb | 21 +---- test/plugin/test_formatter_msgpack.rb | 27 ------- test/test_formatter.rb | 108 ++------------------------ 5 files changed, 8 insertions(+), 200 deletions(-) diff --git a/test/plugin/test_formatter_csv.rb b/test/plugin/test_formatter_csv.rb index 00d1cf1b99..f0a40d9827 100644 --- a/test/plugin/test_formatter_csv.rb +++ b/test/plugin/test_formatter_csv.rb @@ -46,28 +46,6 @@ def test_format assert_equal("\"awesome\",\"awesome2\"\n", formatted) end - def test_format_with_tag - d = create_driver("fields" => "tag,message,message2", - "include_tag_key" => true) - formatted = d.instance.format(tag, @time, { - 'message' => 'awesome', - 'message2' => 'awesome2' - }) - assert_equal("\"tag\",\"awesome\",\"awesome2\"\n", formatted) - end - - def test_format_with_time - d = create_driver("fields" => "time,message,message2", - "include_time_key" => true, - "time_format" => "%Y") - formatted = d.instance.format(tag, @time, { - 'message' => 'awesome', - 'message2' => 'awesome2' - }) - assert_equal("\"#{Time.now.year}\",\"awesome\",\"awesome2\"\n", - formatted) - end - def test_format_with_customized_delimiters d = create_driver("fields" => "message,message2", "delimiter" => "\t") diff --git a/test/plugin/test_formatter_json.rb b/test/plugin/test_formatter_json.rb index 4149dd6dfa..5f80dd1090 100644 --- a/test/plugin/test_formatter_json.rb +++ b/test/plugin/test_formatter_json.rb @@ -27,34 +27,4 @@ def test_format(data) assert_equal("#{Yajl.dump(record)}\n", formatted) end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_tag(data) - d = create_driver('include_tag_key' => 'true', 'tag_key' => 'foo', 'json_parser' => data) - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['foo'] = tag - assert_equal("#{Yajl.dump(r)}\n", formatted) - end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_time(data) - d = create_driver('include_time_key' => 'true', 'localtime' => '', 'json_parser' => data) - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['time'] = time2str(@time, localtime: true) - assert_equal("#{Yajl.dump(r)}\n", formatted) - end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_time_as_number(data) - d = create_driver('include_time_key' => 'true', 'time_as_epoch' => 'true', 'time_key' => 'epoch', 'json_parser' => data) - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['epoch'] = @time - assert_equal("#{Yajl.dump(r)}\n", formatted) - end end diff --git a/test/plugin/test_formatter_ltsv.rb b/test/plugin/test_formatter_ltsv.rb index f594cd7b9d..65371662ee 100644 --- a/test/plugin/test_formatter_ltsv.rb +++ b/test/plugin/test_formatter_ltsv.rb @@ -16,7 +16,7 @@ def tag end def record - {'message' => 'awesome'} + {'message' => 'awesome', 'greeting' => 'hello'} end def test_config_params @@ -37,32 +37,17 @@ def test_format d = create_driver({}) formatted = d.instance.format(tag, @time, record) - assert_equal("message:awesome\n", formatted) - end - - def test_format_with_tag - d = create_driver('include_tag_key' => 'true') - formatted = d.instance.format(tag, @time, record) - - assert_equal("message:awesome\ttag:tag\n", formatted) - end - - def test_format_with_time - d = create_driver('include_time_key' => 'true', 'time_format' => '%Y') - formatted = d.instance.format(tag, @time, record) - - assert_equal("message:awesome\ttime:#{Time.now.year}\n", formatted) + assert_equal("message:awesome\tgreeting:hello\n", formatted) end def test_format_with_customized_delimiters d = create_driver( - 'include_tag_key' => 'true', 'delimiter' => ',', 'label_delimiter' => '=', ) formatted = d.instance.format(tag, @time, record) - assert_equal("message=awesome,tag=tag\n", formatted) + assert_equal("message=awesome,greeting=hello\n", formatted) end sub_test_case "time config" do diff --git a/test/plugin/test_formatter_msgpack.rb b/test/plugin/test_formatter_msgpack.rb index 6ceb109f2d..7bf32e51f7 100644 --- a/test/plugin/test_formatter_msgpack.rb +++ b/test/plugin/test_formatter_msgpack.rb @@ -25,31 +25,4 @@ def test_format assert_equal(record.to_msgpack, formatted) end - - def test_format_with_include_tag - d = create_driver('include_tag_key' => 'true', 'tag_key' => 'foo') - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['foo'] = tag - assert_equal(r.to_msgpack, formatted) - end - - def test_format_with_include_time - d = create_driver('include_time_key' => 'true', 'localtime' => '') - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['time'] = time2str(@time, localtime: true) - assert_equal(r.to_msgpack, formatted) - end - - def test_format_with_include_time_as_number - d = create_driver('include_time_key' => 'true', 'time_as_epoch' => 'true', 'time_key' => 'epoch') - formatted = d.instance.format(tag, @time, record.dup) - - r = record - r['epoch'] = @time - assert_equal(r.to_msgpack, formatted) - end end diff --git a/test/test_formatter.rb b/test/test_formatter.rb index 09e008cd2d..a856517ee4 100644 --- a/test/test_formatter.rb +++ b/test/test_formatter.rb @@ -10,7 +10,7 @@ def tag end def record - {'message' => 'awesome'} + {'message' => 'awesome', 'greeting' => 'hello'} end def with_timezone(tz) @@ -121,36 +121,6 @@ def test_format(data) assert_equal("#{Yajl.dump(record)}\n", formatted) end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_tag(data) - @formatter.configure('include_tag_key' => 'true', 'tag_key' => 'foo', 'json_parser' => data) - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['foo'] = tag - assert_equal("#{Yajl.dump(r)}\n", formatted) - end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_time(data) - @formatter.configure('include_time_key' => 'true', 'localtime' => '', 'json_parser' => data) - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['time'] = time2str(@time, localtime: true) - assert_equal("#{Yajl.dump(r)}\n", formatted) - end - - data('oj' => 'oj', 'yajl' => 'yajl') - def test_format_with_include_time_as_number(data) - @formatter.configure('include_time_key' => 'true', 'time_as_epoch' => 'true', 'time_key' => 'epoch', 'json_parser' => data) - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['epoch'] = @time - assert_equal("#{Yajl.dump(r)}\n", formatted) - end end class MessagePackFormatterTest < ::Test::Unit::TestCase @@ -167,33 +137,6 @@ def test_format assert_equal(record.to_msgpack, formatted) end - - def test_format_with_include_tag - @formatter.configure('include_tag_key' => 'true', 'tag_key' => 'foo') - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['foo'] = tag - assert_equal(r.to_msgpack, formatted) - end - - def test_format_with_include_time - @formatter.configure('include_time_key' => 'true', 'localtime' => '') - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['time'] = time2str(@time, localtime: true) - assert_equal(r.to_msgpack, formatted) - end - - def test_format_with_include_time_as_number - @formatter.configure('include_time_key' => 'true', 'time_as_epoch' => 'true', 'time_key' => 'epoch') - formatted = @formatter.format(tag, @time, record.dup) - - r = record - r['epoch'] = @time - assert_equal(r.to_msgpack, formatted) - end end class LabeledTSVFormatterTest < ::Test::Unit::TestCase @@ -221,32 +164,17 @@ def test_format @formatter.configure({}) formatted = @formatter.format(tag, @time, record) - assert_equal("message:awesome\n", formatted) - end - - def test_format_with_tag - @formatter.configure('include_tag_key' => 'true') - formatted = @formatter.format(tag, @time, record) - - assert_equal("message:awesome\ttag:tag\n", formatted) - end - - def test_format_with_time - @formatter.configure('include_time_key' => 'true', 'time_format' => '%Y') - formatted = @formatter.format(tag, @time, record) - - assert_equal("message:awesome\ttime:#{Time.now.year}\n", formatted) + assert_equal("message:awesome\tgreeting:hello\n", formatted) end def test_format_with_customized_delimiters @formatter.configure( - 'include_tag_key' => 'true', 'delimiter' => ',', 'label_delimiter' => '=', ) formatted = @formatter.format(tag, @time, record) - assert_equal("message=awesome,tag=tag\n", formatted) + assert_equal("message=awesome,greeting=hello\n", formatted) end end @@ -257,11 +185,11 @@ def setup @formatter = TextFormatter::CsvFormatter.new @time = Engine.now end - + def test_config_params assert_equal ',', @formatter.delimiter assert_equal true, @formatter.force_quotes - assert_equal [], @formatter.fields + assert_nil @formatter.fields end data( @@ -284,32 +212,6 @@ def test_format assert_equal("\"awesome\",\"awesome2\"\n", formatted) end - def test_format_with_tag - @formatter.configure( - 'fields' => 'tag,message,message2', - 'include_tag_key' => 'true' - ) - formatted = @formatter.format(tag, @time, { - 'message' => 'awesome', - 'message2' => 'awesome2' - }) - assert_equal("\"tag\",\"awesome\",\"awesome2\"\n", formatted) - end - - def test_format_with_time - @formatter.configure( - 'fields' => 'time,message,message2', - 'include_time_key' => 'true', - 'time_format' => '%Y' - ) - formatted = @formatter.format(tag, @time, { - 'message' => 'awesome', - 'message2' => 'awesome2' - }) - assert_equal("\"#{Time.now.year}\",\"awesome\",\"awesome2\"\n", - formatted) - end - def test_format_with_customized_delimiters @formatter.configure( 'fields' => 'message,message2', From 7a61e217c765eb0cf76c21e5edb1b923fdef4c9d Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 16:12:43 +0900 Subject: [PATCH 18/23] add parameter "utc" to turn "localtime" false to specify UTC explicitly --- lib/fluent/plugin_helper/inject.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index bbe0fc88f3..fa22341428 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -69,6 +69,7 @@ module InjectParams config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float config_param :time_format, :string, default: nil config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc + config_param :utc, :bool, default: false # placeholder to turn localtime to false config_param :timezone, :string, default: nil end end @@ -88,6 +89,12 @@ def initialize end def configure(conf) + conf.elements('inject').each do |e| + if e.has_key?('utc') && Fluent::Config.bool_value(e['utc']) + e['localtime'] = 'false' + end + end + super if @inject_config From fc380f8a1534ba36333efa1ab9983362940bbe96 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 21 Jul 2016 16:13:15 +0900 Subject: [PATCH 19/23] remove timezone related tests from formatter, and add it to tests of inject plugin helper --- test/plugin/test_formatter_ltsv.rb | 48 ------------------ test/plugin_helper/test_inject.rb | 81 ++++++++++++++++++++++++++++++ test/test_formatter.rb | 44 ---------------- 3 files changed, 81 insertions(+), 92 deletions(-) diff --git a/test/plugin/test_formatter_ltsv.rb b/test/plugin/test_formatter_ltsv.rb index 65371662ee..87008b294c 100644 --- a/test/plugin/test_formatter_ltsv.rb +++ b/test/plugin/test_formatter_ltsv.rb @@ -49,52 +49,4 @@ def test_format_with_customized_delimiters assert_equal("message=awesome,greeting=hello\n", formatted) end - - sub_test_case "time config" do - def setup - @time = event_time("2014-09-27 00:00:00 +00:00").to_i - end - - def with_timezone(tz) - oldtz, ENV['TZ'] = ENV['TZ'], tz - yield - ensure - ENV['TZ'] = oldtz - end - - def format(conf) - d = create_driver({'include_time_key' => true}.merge(conf)) - formatted = d.instance.format("tag", @time, {}) - # Drop the leading "time:" and the trailing "\n". - formatted[5..-2] - end - - def test_none - with_timezone("UTC-01") do - # 'localtime' is true by default. - assert_equal("2014-09-27T01:00:00+01:00", format({})) - end - end - - def test_utc - with_timezone("UTC-01") do - # 'utc' takes precedence over 'localtime'. - assert_equal("2014-09-27T00:00:00Z", format("utc" => true)) - end - end - - def test_timezone - with_timezone("UTC-01") do - # 'timezone' takes precedence over 'localtime'. - assert_equal("2014-09-27T02:00:00+02:00", format("timezone" => "+02")) - end - end - - def test_utc_timezone - with_timezone("UTC-01") do - # 'timezone' takes precedence over 'utc'. - assert_equal("2014-09-27T09:00:00+09:00", format("utc" => true, "timezone" => "Asia/Tokyo")) - end - end - end end diff --git a/test/plugin_helper/test_inject.rb b/test/plugin_helper/test_inject.rb index 6eb8fb39a1..fc896994f8 100644 --- a/test/plugin_helper/test_inject.rb +++ b/test/plugin_helper/test_inject.rb @@ -222,6 +222,7 @@ def config_inject_section(hash = {}) assert_equal record.merge(injected), @d.inject_values_to_record('tag', time, record) end end + sub_test_case 'using inject_values_to_event_stream' do local_timezone = Time.now.strftime('%z') time_in_unix = Time.parse("2016-06-21 08:10:11 #{local_timezone}").to_i @@ -403,4 +404,84 @@ def config_inject_section(hash = {}) assert_equal expected_es, @d.inject_values_to_event_stream('tag', data) end end + + sub_test_case 'time formatting with modified timezone' do + setup do + @time = event_time("2014-09-27 00:00:00 +00:00").to_i + end + + def with_timezone(tz) + oldtz, ENV['TZ'] = ENV['TZ'], tz + yield + ensure + ENV['TZ'] = oldtz + end + + def format(conf) + @d.configure(config_inject_section( + "hostname_key" => "hostnamedata", + "hostname" => "myname.local", + "tag_key" => "tagdata", + "time_key" => "timedata", + "time_type" => "string", + "time_format" => "%Y_%m_%d %H:%M:%S.%N %z", + "timezone" => "+0000", + )) + @d.start + + record = {"key1" => "value1", "key2" => 2} + injected = {"hostnamedata" => "myname.local", "tagdata" => "tag", "timedata" => "2016_06_20 23:10:11.320101224 +0000"} + assert_equal record.merge(injected), @d.inject_values_to_record('tag', time, record) + + + d = create_driver({'include_time_key' => true}.merge(conf)) + formatted = d.instance.format("tag", @time, {}) + # Drop the leading "time:" and the trailing "\n". + formatted[5..-2] + end + + def test_nothing_specified_about_time_formatting + with_timezone("UTC-01") do + # 'localtime' is true by default. + @d.configure(config_inject_section("time_key" => "t", "time_type" => "string")) + @d.start + record = @d.inject_values_to_record('tag', @time, {"message" => "yay"}) + + assert_equal("2014-09-27T01:00:00+01:00", record['t']) + end + end + + def test_utc + with_timezone("UTC-01") do + # 'utc' takes precedence over 'localtime'. + @d.configure(config_inject_section("time_key" => "t", "time_type" => "string", "utc" => "true")) + @d.start + record = @d.inject_values_to_record('tag', @time, {"message" => "yay"}) + + assert_equal("2014-09-27T00:00:00Z", record['t']) + end + end + + def test_timezone + with_timezone("UTC-01") do + # 'timezone' takes precedence over 'localtime'. + @d.configure(config_inject_section("time_key" => "t", "time_type" => "string", "timezone" => "+02")) + @d.start + record = @d.inject_values_to_record('tag', @time, {"message" => "yay"}) + + assert_equal("2014-09-27T02:00:00+02:00", record['t']) + end + end + + def test_utc_timezone + with_timezone("UTC-01") do + # 'timezone' takes precedence over 'utc'. + @d.configure(config_inject_section("time_key" => "t", "time_type" => "string", "timezone" => "Asia/Tokyo", "utc" => "true")) + @d.start + record = @d.inject_values_to_record('tag', @time, {"message" => "yay"}) + + assert_equal("2014-09-27T09:00:00+09:00", record['t']) + end + end + end end diff --git a/test/test_formatter.rb b/test/test_formatter.rb index a856517ee4..d65d3eb74a 100644 --- a/test/test_formatter.rb +++ b/test/test_formatter.rb @@ -316,48 +316,4 @@ def test_find_formatter(data) $LOAD_PATH.shift end end - - class TimeConfigTest < ::Test::Unit::TestCase - include FormatterTest - - def setup - @formatter = TextFormatter::LabeledTSVFormatter.new - @time = Time.new(2014, 9, 27, 0, 0, 0, 0).to_i - end - - def format(conf) - @formatter.configure({'include_time_key' => true}.merge(conf)) - formatted = @formatter.format("tag", @time, {}) - # Drop the leading "time:" and the trailing "\n". - formatted[5..-2] - end - - def test_none - with_timezone("UTC-01") do - # 'localtime' is true by default. - assert_equal("2014-09-27T01:00:00+01:00", format({})) - end - end - - def test_utc - with_timezone("UTC-01") do - # 'utc' takes precedence over 'localtime'. - assert_equal("2014-09-27T00:00:00Z", format("utc" => true)) - end - end - - def test_timezone - with_timezone("UTC-01") do - # 'timezone' takes precedence over 'localtime'. - assert_equal("2014-09-27T02:00:00+02:00", format("timezone" => "+02")) - end - end - - def test_utc_timezone - with_timezone("UTC-01") do - # 'timezone' takes precedence over 'utc'. - assert_equal("2014-09-27T09:00:00+09:00", format("utc" => true, "timezone" => "Asia/Tokyo")) - end - end - end end From 80498929e8be11d0a343fdaa17e8f46b95d84355 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 27 Jul 2016 13:25:16 +0900 Subject: [PATCH 20/23] raise error for both "utc" and "localtime": these cannot be used at the same time --- lib/fluent/compat/formatter_utils.rb | 12 +++++----- lib/fluent/plugin/formatter_out_file.rb | 12 +++++----- lib/fluent/plugin_helper/compat_parameters.rb | 12 +++++----- test/plugin/test_formatter_out_file.rb | 22 +++++++++++++++++++ 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/lib/fluent/compat/formatter_utils.rb b/lib/fluent/compat/formatter_utils.rb index 588d39b5f1..b67f6723f2 100644 --- a/lib/fluent/compat/formatter_utils.rb +++ b/lib/fluent/compat/formatter_utils.rb @@ -48,16 +48,16 @@ def self.convert_formatter_conf(conf) inject_params['time_type'] = 'unixtime' end if conf.has_key?('localtime') || conf.has_key?('utc') - if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) - inject_params['localtime'] = true - elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) - inject_params['localtime'] = false + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + inject_params['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + inject_params['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) # Specifying "localtime false" means using UTC in TimeFormatter # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). # There are difference between "Z" and "+0000" in timezone formatting. # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way - else - log.warn "both of localtime and utc are specified as false" end end diff --git a/lib/fluent/plugin/formatter_out_file.rb b/lib/fluent/plugin/formatter_out_file.rb index 1e2173bde7..4c446b7968 100644 --- a/lib/fluent/plugin/formatter_out_file.rb +++ b/lib/fluent/plugin/formatter_out_file.rb @@ -44,16 +44,16 @@ def configure(conf) conf['time_type'] = 'unixtime' end if conf.has_key?('localtime') || conf.has_key?('utc') - if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) - conf['localtime'] = true - elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) - conf['localtime'] = false + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + conf['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + conf['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) # Specifying "localtime false" means using UTC in TimeFormatter # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). # There are difference between "Z" and "+0000" in timezone formatting. # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way - else - log.warn "both of localtime and utc are specified as false" end end diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index a608067359..1962922ace 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -158,16 +158,16 @@ def compat_parameters_inject(conf) attr['time_type'] = 'unixtime' end if conf.has_key?('localtime') || conf.has_key?('utc') - if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime']) - attr['localtime'] = true - elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc']) - attr['localtime'] = false + if conf.has_key?('localtime') && conf.has_key?('utc') + raise Fluent::ConfigError, "both of utc and localtime are specified, use only one of them" + elsif conf.has_key?('localtime') + attr['localtime'] = Fluent::Config.bool_value(conf['localtime']) + elsif conf.has_key?('utc') + attr['localtime'] = !(Fluent::Config.bool_value(conf['utc'])) # Specifying "localtime false" means using UTC in TimeFormatter # And specifying "utc" is different from specifying "timezone +0000"(it's not always UTC). # There are difference between "Z" and "+0000" in timezone formatting. # TODO: add kwargs to TimeFormatter to specify "using localtime", "using UTC" or "using specified timezone" in more explicit way - else - log.warn "both of localtime and utc are specified as false" end end diff --git a/test/plugin/test_formatter_out_file.rb b/test/plugin/test_formatter_out_file.rb index 2aff4a5c0f..4433e7f9d9 100644 --- a/test/plugin/test_formatter_out_file.rb +++ b/test/plugin/test_formatter_out_file.rb @@ -10,6 +10,8 @@ def setup def create_driver(conf = {}) d = Fluent::Test::Driver::Formatter.new(Fluent::Plugin::OutFileFormatter) case conf + when Fluent::Config::Element + d.configure(conf) when Hash d.configure({'utc' => true}.merge(conf)) else @@ -25,6 +27,26 @@ def record {'message' => 'awesome'} end + data('both true' => 'true', 'both false' => 'false') + def test_configured_with_both_of_utc_and_localtime(value) + assert_raise(Fluent::ConfigError.new("both of utc and localtime are specified, use only one of them")) do + create_driver({'utc' => value, 'localtime' => value}) + end + end + + data( + 'configured for localtime by localtime' => ['localtime', 'true', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-26T21:08:30-07:00"], + 'configured for localtime by utc' => ['utc', 'false', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-26T21:08:30-07:00"], + 'configured for utc by localtime' => ['localtime', 'false', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-27T04:08:30Z"], + 'configured for utc by utc' => ['utc', 'true', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-27T04:08:30Z"], + ) + def test_configured_with_utc_or_localtime(data) + key, value, time, expected = data + d = create_driver(config_element('ROOT', '', {key => value})) + tag = 'test' + assert_equal "#{expected}\t#{tag}\t#{Yajl.dump(record)}\n", d.instance.format(tag, time, record) + end + def test_format d = create_driver({}) formatted = d.instance.format(tag, @time, record) From 1123b56625893d74f9c167b938e7b4c01cb97e9e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 27 Jul 2016 13:25:48 +0900 Subject: [PATCH 21/23] remove block because it is same with default behavior --- lib/fluent/plugin_helper/compat_parameters.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb index 1962922ace..9251742c4d 100644 --- a/lib/fluent/plugin_helper/compat_parameters.rb +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -146,9 +146,7 @@ def compat_parameters_inject(conf) return if INJECT_PARAMS.keys.all?{|k| !conf.has_key?(k) } # TODO: warn obsolete parameters if these are deprecated - attr = compat_parameters_copy_to_subsection_attributes(conf, INJECT_PARAMS) do |compat_key, value| - value - end + attr = compat_parameters_copy_to_subsection_attributes(conf, INJECT_PARAMS) if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key']) attr['time_key'] ||= 'time' From dd3b02e8d73ca169d4846e44a13fcbacd007b472 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 27 Jul 2016 19:42:02 +0900 Subject: [PATCH 22/23] specify timezone to get correct result --- test/plugin/test_formatter_out_file.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/plugin/test_formatter_out_file.rb b/test/plugin/test_formatter_out_file.rb index 4433e7f9d9..4fa8225439 100644 --- a/test/plugin/test_formatter_out_file.rb +++ b/test/plugin/test_formatter_out_file.rb @@ -42,9 +42,14 @@ def test_configured_with_both_of_utc_and_localtime(value) ) def test_configured_with_utc_or_localtime(data) key, value, time, expected = data - d = create_driver(config_element('ROOT', '', {key => value})) - tag = 'test' - assert_equal "#{expected}\t#{tag}\t#{Yajl.dump(record)}\n", d.instance.format(tag, time, record) + begin + oldtz, ENV['TZ'] = ENV['TZ'], "UTC-07" + d = create_driver(config_element('ROOT', '', {key => value})) + tag = 'test' + assert_equal "#{expected}\t#{tag}\t#{Yajl.dump(record)}\n", d.instance.format(tag, time, record) + ensure + ENV['TZ'] = oldtz + end end def test_format From d3ea47b5dcbdbffd3334be5f3bfb84ce7d7d4363 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 27 Jul 2016 20:26:06 +0900 Subject: [PATCH 23/23] fix tests independent from machine timezone --- test/plugin/test_formatter_out_file.rb | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test/plugin/test_formatter_out_file.rb b/test/plugin/test_formatter_out_file.rb index 4fa8225439..c8006c2a35 100644 --- a/test/plugin/test_formatter_out_file.rb +++ b/test/plugin/test_formatter_out_file.rb @@ -34,16 +34,18 @@ def test_configured_with_both_of_utc_and_localtime(value) end end + time_i = Time.parse("2016-07-26 21:08:30 -0700").to_i data( - 'configured for localtime by localtime' => ['localtime', 'true', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-26T21:08:30-07:00"], - 'configured for localtime by utc' => ['utc', 'false', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-26T21:08:30-07:00"], - 'configured for utc by localtime' => ['localtime', 'false', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-27T04:08:30Z"], - 'configured for utc by utc' => ['utc', 'true', Time.parse("2016-07-26 21:08:30 -0700"), "2016-07-27T04:08:30Z"], + 'configured for localtime by localtime' => ['localtime', 'true', time_i, "2016-07-26T21:08:30-07:00"], + 'configured for localtime by utc' => ['utc', 'false', time_i, "2016-07-26T21:08:30-07:00"], + 'configured for utc by localtime' => ['localtime', 'false', time_i, "2016-07-27T04:08:30Z"], + 'configured for utc by utc' => ['utc', 'true', time_i, "2016-07-27T04:08:30Z"], ) def test_configured_with_utc_or_localtime(data) - key, value, time, expected = data + key, value, time_i, expected = data + time = Time.at(time_i) begin - oldtz, ENV['TZ'] = ENV['TZ'], "UTC-07" + oldtz, ENV['TZ'] = ENV['TZ'], "UTC+07" d = create_driver(config_element('ROOT', '', {key => value})) tag = 'test' assert_equal "#{expected}\t#{tag}\t#{Yajl.dump(record)}\n", d.instance.format(tag, time, record)