From d6296d352fa6e97af8cdb60f5e77963a9170b81b Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 27 Jul 2017 07:30:36 +0900 Subject: [PATCH 1/2] Add to and format parameter --- lib/fluent/log.rb | 103 +++++++++++++++++++++++++----- lib/fluent/supervisor.rb | 13 +++- lib/fluent/system_config.rb | 3 + test/config/test_system_config.rb | 15 +++++ test/test_log.rb | 26 ++++++++ test/test_supervisor.rb | 4 ++ 6 files changed, 146 insertions(+), 18 deletions(-) diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index 310b15982f..a83b99c46f 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -96,8 +96,12 @@ def initialize(logger, opts={}) @level = logger.level + 1 @debug_mode = false @log_event_enabled = false - @time_format = '%Y-%m-%d %H:%M:%S %z ' @depth_offset = 1 + @format = nil + @time_format = nil + @formatter = nil + + self.format = :text enable_color out.tty? # TODO: This variable name is unclear so we should change to better name. @threads_exclude_events = [] @@ -136,12 +140,14 @@ def dup dl_opts[:log_level] = @level - 1 logger = ServerEngine::DaemonLogger.new(@out, dl_opts) clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type, worker_id: @worker_id) + clone.format = @format clone.time_format = @time_format clone.log_event_enabled = @log_event_enabled # optional headers/attrs are not copied, because new PluginLogger should have another one of it clone end + attr_reader :format attr_accessor :log_event_enabled attr_accessor :out attr_accessor :level @@ -154,6 +160,37 @@ def logdev=(logdev) nil end + def format=(fmt) + return if @format == fmt + + case fmt + when :text + @format = :text + @time_format = '%Y-%m-%d %H:%M:%S %z' + @formatter = Proc.new { |type, time, level, msg| + r = caller_line(type, time, @depth_offset, level) + r << msg + r + } + when :json + @format = :json + @time_format = '%Y-%m-%d %H:%M:%S %z' + @formatter = Proc.new { |type, time, level, msg| + r = { + 'time' => time.strftime(@time_format), + 'level' => LEVEL_TEXT[level], + 'message' => msg + } + if wid = get_worker_id(type) + r['worker_id'] = wid + end + Yajl.dump(r) + } + end + + nil + end + def reopen! # do nothing in @logger.reopen! because it's already reopened in Supervisor.load_config @logger.reopen! if @logger @@ -235,7 +272,7 @@ def trace(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:trace, args) - puts [@color_trace, caller_line(type, time, @depth_offset, LEVEL_TRACE), msg, @color_reset].join + puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join rescue # logger should not raise an exception. This rescue prevents unexpected behaviour. end @@ -256,7 +293,7 @@ def debug(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:debug, args) - puts [@color_debug, caller_line(type, time, @depth_offset, LEVEL_DEBUG), msg, @color_reset].join + puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join rescue end alias DEBUG debug @@ -276,7 +313,7 @@ def info(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:info, args) - puts [@color_info, caller_line(type, time, @depth_offset, LEVEL_INFO), msg, @color_reset].join + puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join rescue end alias INFO info @@ -296,7 +333,7 @@ def warn(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:warn, args) - puts [@color_warn, caller_line(type, time, @depth_offset, LEVEL_WARN), msg, @color_reset].join + puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join rescue end alias WARN warn @@ -316,7 +353,7 @@ def error(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:error, args) - puts [@color_error, caller_line(type, time, @depth_offset, LEVEL_ERROR), msg, @color_reset].join + puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join rescue end alias ERROR error @@ -336,7 +373,7 @@ def fatal(*args, &block) return if skipped_type?(type) args << block.call if block time, msg = event(:fatal, args) - puts [@color_fatal, caller_line(type, time, @depth_offset, LEVEL_FATAL), msg, @color_reset].join + puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join rescue end alias FATAL fatal @@ -373,19 +410,47 @@ def dump_stacktrace(type, backtrace, level) return if @level > level time = Time.now - line = caller_line(type, time, 5, level) - if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace) - puts [" ", line, 'suppressed same stacktrace'].join + + if @format == :text + line = caller_line(type, time, 5, level) + if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace) + puts [" ", line, 'suppressed same stacktrace'].join + else + backtrace.each { |msg| + puts [" ", line, msg].join + } + Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace + end else - backtrace.each { |msg| - puts [" ", line, msg].join + r = { + 'time' => time.strftime(@time_format), + 'level' => LEVEL_TEXT[level], } - Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace + if wid = get_worker_id(type) + r['worker_id'] = wid + end + + if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace) + r['message'] = 'suppressed same stacktrace' + else + r['message'] = backtrace.join("\n") + Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace + end + + puts Yajl.dump(r) end nil end + def get_worker_id(type) + if type == :default && (@process_type == :worker0 || @process_type == :workers) + @worker_id + else + nil + end + end + def event(level, args) time = Time.now message = @optional_header ? @optional_header.dup : '' @@ -426,7 +491,7 @@ def caller_line(type, time, depth, level) else "".freeze end - log_msg = "#{time.strftime(@time_format)}[#{LEVEL_TEXT[level]}]: #{worker_id_part}" + log_msg = "#{time.strftime(@time_format)} [#{LEVEL_TEXT[level]}]: #{worker_id_part}" if @debug_mode line = caller(depth+1)[0] if match = /^(.+?):(\d+)(?::in `(.*)')?/.match(line) @@ -450,11 +515,13 @@ class PluginLogger < Log def initialize(logger) @logger = logger @level = @logger.level + @format = nil @depth_offset = 2 if logger.instance_variable_defined?(:@suppress_repeated_stacktrace) @suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace) end + self.format = @logger.format enable_color @logger.enable_color? end @@ -462,15 +529,21 @@ def level=(log_level_str) @level = Log.str_to_level(log_level_str) end + alias orig_format= format= alias orig_enable_color enable_color + def format=(fmt) + self.orig_format = fmt + @logger.format = fmt + end + def enable_color(b = true) orig_enable_color b @logger.enable_color b end extend Forwardable - def_delegators '@logger', :enable_color?, :enable_debug, :enable_event, + def_delegators '@logger', :get_worker_id, :enable_color?, :enable_debug, :enable_event, :disable_events, :log_event_enabled, :log_event_enamed=, :time_format, :time_format=, :event, :caller_line, :puts, :write, :<<, :flush, :reset, :out, :out=, :optional_header, :optional_header=, :optional_attrs, :optional_attrs= diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index c4bdcaef0e..1b456e5b7f 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -360,6 +360,10 @@ def reopen! self end + def apply_options(opts) + $log.format = opts[:format] if opts[:format] + end + def level=(level) @level = level $log.level = level @@ -439,6 +443,9 @@ def run_supervisor show_plugin_config if @show_plugin_config read_config set_system_config + @log.apply_options(format: @system_config.log.format) + + $log.info :supervisor, "parsing config file is succeeded", path: @config_path if @workers < 1 raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}" @@ -485,11 +492,12 @@ def run_worker else :workers end @log.init(process_type, worker_id) - Process.setproctitle("worker:#{@process_name}") if @process_name - show_plugin_config if @show_plugin_config read_config set_system_config + @log.apply_options(format: @system_config.log.format) + + Process.setproctitle("worker:#{@process_name}") if @process_name if @standalone_worker && @workers != 1 raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@workers}" @@ -715,7 +723,6 @@ def main_process(&block) end def read_config - $log.info :supervisor, "reading config file", path: @config_path @config_fname = File.basename(@config_path) @config_basedir = File.dirname(@config_path) @config_data = File.open(@config_path, "r:utf-8:utf-8") {|f| f.read } diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 30e1cafdf1..9301392e02 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -46,6 +46,9 @@ class SystemConfig config_param :dir_permission, default: nil do |v| v.to_i(8) end + config_section :log, required: false, init: true, multi: false do + config_param :format, :enum, list: [:text, :json], default: :text + end def self.create(conf) systems = conf.elements(name: 'system') diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index 3fa31db320..f873915e8c 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -55,6 +55,7 @@ def parse_text(text) assert_nil(sc.emit_error_log_interval) assert_nil(sc.suppress_config_dump) assert_nil(sc.without_source) + assert_equal(:text, sc.log.format) assert_equal(1, s.instance_variable_get(:@workers)) assert_nil(s.instance_variable_get(:@root_dir)) assert_equal(Fluent::Log::LEVEL_INFO, s.instance_variable_get(:@log_level)) @@ -91,6 +92,20 @@ def parse_text(text) assert_not_nil(s.instance_variable_get("@#{key}")) end + test "log parameters" do + conf = parse_text(<<-EOS) + + + format json + + + EOS + s = FakeSupervisor.new + sc = Fluent::SystemConfig.new(conf) + sc.apply(s) + assert_equal(:json, sc.log.format) + end + data( 'foo' => ['foo', 'bar'], 'hoge' => ['hoge', 'fuga'], diff --git a/test/test_log.rb b/test/test_log.rb index b0688735ba..a767f69790 100644 --- a/test/test_log.rb +++ b/test/test_log.rb @@ -379,6 +379,32 @@ def test_dup assert_equal(Fluent::Log::LEVEL_TRACE, log2.level) end + def test_format_json + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.format = :json + log.level = Fluent::Log::LEVEL_TRACE + log.trace "trace log" + log.debug "debug log" + log.info "info log" + log.warn "warn log" + log.error "error log" + log.fatal "fatal log" + expected = [ + "#{@timestamp_str} [trace]: trace log\n", + "#{@timestamp_str} [debug]: debug log\n", + "#{@timestamp_str} [info]: info log\n", + "#{@timestamp_str} [warn]: warn log\n", + "#{@timestamp_str} [error]: error log\n", + "#{@timestamp_str} [fatal]: fatal log\n" + ] + assert_equal(expected, log.out.logs.map { |l| + r = JSON.parse(l) + "#{r['time']} [#{r['level']}]: #{r['message']}\n" + }) + end + def test_disable_events dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 76787f9575..592dec1853 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -130,6 +130,9 @@ def test_system_config process_name "process_name" log_level info root_dir #{TMP_ROOT_DIR} + + format json + EOC conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) @@ -145,6 +148,7 @@ def test_system_config assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level assert_equal TMP_ROOT_DIR, sys_conf.root_dir + assert_equal :json, sys_conf.log.format end def test_main_process_signal_handlers From 283d55c6a3b870e1bc6a0e9a120c3688df5168a1 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 27 Jul 2017 07:33:35 +0900 Subject: [PATCH 2/2] Add time_format parameter to . fix #1640 --- lib/fluent/supervisor.rb | 5 +++-- lib/fluent/system_config.rb | 1 + test/config/test_system_config.rb | 3 +++ test/test_log.rb | 24 ++++++++++++++++++++++++ test/test_supervisor.rb | 2 ++ 5 files changed, 33 insertions(+), 2 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 1b456e5b7f..3f0c74dd6e 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -362,6 +362,7 @@ def reopen! def apply_options(opts) $log.format = opts[:format] if opts[:format] + $log.time_format = opts[:time_format] if opts[:time_format] end def level=(level) @@ -443,7 +444,7 @@ def run_supervisor show_plugin_config if @show_plugin_config read_config set_system_config - @log.apply_options(format: @system_config.log.format) + @log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format) $log.info :supervisor, "parsing config file is succeeded", path: @config_path @@ -495,7 +496,7 @@ def run_worker show_plugin_config if @show_plugin_config read_config set_system_config - @log.apply_options(format: @system_config.log.format) + @log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format) Process.setproctitle("worker:#{@process_name}") if @process_name diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 9301392e02..0dbd6731e5 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -48,6 +48,7 @@ class SystemConfig end config_section :log, required: false, init: true, multi: false do config_param :format, :enum, list: [:text, :json], default: :text + config_param :time_format, :string, default: '%Y-%m-%d %H:%M:%S %z' end def self.create(conf) diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index f873915e8c..fc681f9f3b 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -56,6 +56,7 @@ def parse_text(text) assert_nil(sc.suppress_config_dump) assert_nil(sc.without_source) assert_equal(:text, sc.log.format) + assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format) assert_equal(1, s.instance_variable_get(:@workers)) assert_nil(s.instance_variable_get(:@root_dir)) assert_equal(Fluent::Log::LEVEL_INFO, s.instance_variable_get(:@log_level)) @@ -97,6 +98,7 @@ def parse_text(text) format json + time_format %Y EOS @@ -104,6 +106,7 @@ def parse_text(text) sc = Fluent::SystemConfig.new(conf) sc.apply(s) assert_equal(:json, sc.log.format) + assert_equal('%Y', sc.log.time_format) end data( diff --git a/test/test_log.rb b/test/test_log.rb index a767f69790..685524a361 100644 --- a/test/test_log.rb +++ b/test/test_log.rb @@ -405,6 +405,30 @@ def test_format_json }) end + def test_time_format + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.time_format = "%Y" + log.level = Fluent::Log::LEVEL_TRACE + log.trace "trace log" + log.debug "debug log" + log.info "info log" + log.warn "warn log" + log.error "error log" + log.fatal "fatal log" + timestamp_str = @timestamp.strftime("%Y") + expected = [ + "#{timestamp_str} [trace]: trace log\n", + "#{timestamp_str} [debug]: debug log\n", + "#{timestamp_str} [info]: info log\n", + "#{timestamp_str} [warn]: warn log\n", + "#{timestamp_str} [error]: error log\n", + "#{timestamp_str} [fatal]: fatal log\n" + ] + assert_equal(expected, log.out.logs) + end + def test_disable_events dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 592dec1853..1bc3649455 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -132,6 +132,7 @@ def test_system_config root_dir #{TMP_ROOT_DIR} format json + time_format %Y EOC @@ -149,6 +150,7 @@ def test_system_config assert_equal 2, sys_conf.log_level assert_equal TMP_ROOT_DIR, sys_conf.root_dir assert_equal :json, sys_conf.log.format + assert_equal '%Y', sys_conf.log.time_format end def test_main_process_signal_handlers