Skip to content

Commit

Permalink
Merge pull request #1644 from fluent/system-log-directive
Browse files Browse the repository at this point in the history
Add <log> directive to <system>
  • Loading branch information
repeatedly authored Jul 28, 2017
2 parents 8287dc1 + 283d55c commit 6e89d3f
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 18 deletions.
103 changes: 88 additions & 15 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ def initialize(logger, opts={})
@level = logger.level + 1
@debug_mode = false
@log_event_enabled = false
@time_format = '%Y-%m-%d %H:%M:%S %z '
@depth_offset = 1
@format = nil
@time_format = nil
@formatter = nil

self.format = :text
enable_color out.tty?
# TODO: This variable name is unclear so we should change to better name.
@threads_exclude_events = []
Expand Down Expand Up @@ -136,12 +140,14 @@ def dup
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type, worker_id: @worker_id)
clone.format = @format
clone.time_format = @time_format
clone.log_event_enabled = @log_event_enabled
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
clone
end

attr_reader :format
attr_accessor :log_event_enabled
attr_accessor :out
attr_accessor :level
Expand All @@ -154,6 +160,37 @@ def logdev=(logdev)
nil
end

def format=(fmt)
return if @format == fmt

case fmt
when :text
@format = :text
@time_format = '%Y-%m-%d %H:%M:%S %z'
@formatter = Proc.new { |type, time, level, msg|
r = caller_line(type, time, @depth_offset, level)
r << msg
r
}
when :json
@format = :json
@time_format = '%Y-%m-%d %H:%M:%S %z'
@formatter = Proc.new { |type, time, level, msg|
r = {
'time' => time.strftime(@time_format),
'level' => LEVEL_TEXT[level],
'message' => msg
}
if wid = get_worker_id(type)
r['worker_id'] = wid
end
Yajl.dump(r)
}
end

nil
end

def reopen!
# do nothing in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
Expand Down Expand Up @@ -235,7 +272,7 @@ def trace(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:trace, args)
puts [@color_trace, caller_line(type, time, @depth_offset, LEVEL_TRACE), msg, @color_reset].join
puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join
rescue
# logger should not raise an exception. This rescue prevents unexpected behaviour.
end
Expand All @@ -256,7 +293,7 @@ def debug(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:debug, args)
puts [@color_debug, caller_line(type, time, @depth_offset, LEVEL_DEBUG), msg, @color_reset].join
puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join
rescue
end
alias DEBUG debug
Expand All @@ -276,7 +313,7 @@ def info(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:info, args)
puts [@color_info, caller_line(type, time, @depth_offset, LEVEL_INFO), msg, @color_reset].join
puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
rescue
end
alias INFO info
Expand All @@ -296,7 +333,7 @@ def warn(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:warn, args)
puts [@color_warn, caller_line(type, time, @depth_offset, LEVEL_WARN), msg, @color_reset].join
puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join
rescue
end
alias WARN warn
Expand All @@ -316,7 +353,7 @@ def error(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:error, args)
puts [@color_error, caller_line(type, time, @depth_offset, LEVEL_ERROR), msg, @color_reset].join
puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join
rescue
end
alias ERROR error
Expand All @@ -336,7 +373,7 @@ def fatal(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:fatal, args)
puts [@color_fatal, caller_line(type, time, @depth_offset, LEVEL_FATAL), msg, @color_reset].join
puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join
rescue
end
alias FATAL fatal
Expand Down Expand Up @@ -373,19 +410,47 @@ def dump_stacktrace(type, backtrace, level)
return if @level > level

time = Time.now
line = caller_line(type, time, 5, level)
if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
puts [" ", line, 'suppressed same stacktrace'].join

if @format == :text
line = caller_line(type, time, 5, level)
if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
puts [" ", line, 'suppressed same stacktrace'].join
else
backtrace.each { |msg|
puts [" ", line, msg].join
}
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
end
else
backtrace.each { |msg|
puts [" ", line, msg].join
r = {
'time' => time.strftime(@time_format),
'level' => LEVEL_TEXT[level],
}
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
if wid = get_worker_id(type)
r['worker_id'] = wid
end

if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
r['message'] = 'suppressed same stacktrace'
else
r['message'] = backtrace.join("\n")
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
end

puts Yajl.dump(r)
end

nil
end

def get_worker_id(type)
if type == :default && (@process_type == :worker0 || @process_type == :workers)
@worker_id
else
nil
end
end

def event(level, args)
time = Time.now
message = @optional_header ? @optional_header.dup : ''
Expand Down Expand Up @@ -426,7 +491,7 @@ def caller_line(type, time, depth, level)
else
"".freeze
end
log_msg = "#{time.strftime(@time_format)}[#{LEVEL_TEXT[level]}]: #{worker_id_part}"
log_msg = "#{time.strftime(@time_format)} [#{LEVEL_TEXT[level]}]: #{worker_id_part}"
if @debug_mode
line = caller(depth+1)[0]
if match = /^(.+?):(\d+)(?::in `(.*)')?/.match(line)
Expand All @@ -450,27 +515,35 @@ class PluginLogger < Log
def initialize(logger)
@logger = logger
@level = @logger.level
@format = nil
@depth_offset = 2
if logger.instance_variable_defined?(:@suppress_repeated_stacktrace)
@suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace)
end

self.format = @logger.format
enable_color @logger.enable_color?
end

def level=(log_level_str)
@level = Log.str_to_level(log_level_str)
end

alias orig_format= format=
alias orig_enable_color enable_color

def format=(fmt)
self.orig_format = fmt
@logger.format = fmt
end

def enable_color(b = true)
orig_enable_color b
@logger.enable_color b
end

extend Forwardable
def_delegators '@logger', :enable_color?, :enable_debug, :enable_event,
def_delegators '@logger', :get_worker_id, :enable_color?, :enable_debug, :enable_event,
:disable_events, :log_event_enabled, :log_event_enamed=, :time_format, :time_format=,
:event, :caller_line, :puts, :write, :<<, :flush, :reset, :out, :out=,
:optional_header, :optional_header=, :optional_attrs, :optional_attrs=
Expand Down
14 changes: 11 additions & 3 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ def reopen!
self
end

def apply_options(opts)
$log.format = opts[:format] if opts[:format]
$log.time_format = opts[:time_format] if opts[:time_format]
end

def level=(level)
@level = level
$log.level = level
Expand Down Expand Up @@ -443,6 +448,9 @@ def run_supervisor
show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

$log.info :supervisor, "parsing config file is succeeded", path: @config_path

if @workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}"
Expand Down Expand Up @@ -489,11 +497,12 @@ def run_worker
else :workers
end
@log.init(process_type, worker_id)
Process.setproctitle("worker:#{@process_name}") if @process_name

show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

Process.setproctitle("worker:#{@process_name}") if @process_name

if @standalone_worker && @workers != 1
raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@workers}"
Expand Down Expand Up @@ -719,7 +728,6 @@ def main_process(&block)
end

def read_config
$log.info :supervisor, "reading config file", path: @config_path
@config_fname = File.basename(@config_path)
@config_basedir = File.dirname(@config_path)
@config_data = File.open(@config_path, "r:utf-8:utf-8") {|f| f.read }
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class SystemConfig
config_param :dir_permission, default: nil do |v|
v.to_i(8)
end
config_section :log, required: false, init: true, multi: false do
config_param :format, :enum, list: [:text, :json], default: :text
config_param :time_format, :string, default: '%Y-%m-%d %H:%M:%S %z'
end

def self.create(conf)
systems = conf.elements(name: 'system')
Expand Down
18 changes: 18 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def parse_text(text)
assert_nil(sc.emit_error_log_interval)
assert_nil(sc.suppress_config_dump)
assert_nil(sc.without_source)
assert_equal(:text, sc.log.format)
assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format)
assert_equal(1, s.instance_variable_get(:@workers))
assert_nil(s.instance_variable_get(:@root_dir))
assert_equal(Fluent::Log::LEVEL_INFO, s.instance_variable_get(:@log_level))
Expand Down Expand Up @@ -91,6 +93,22 @@ def parse_text(text)
assert_not_nil(s.instance_variable_get("@#{key}"))
end

test "log parameters" do
conf = parse_text(<<-EOS)
<system>
<log>
format json
time_format %Y
</log>
</system>
EOS
s = FakeSupervisor.new
sc = Fluent::SystemConfig.new(conf)
sc.apply(s)
assert_equal(:json, sc.log.format)
assert_equal('%Y', sc.log.time_format)
end

data(
'foo' => ['foo', 'bar'],
'hoge' => ['hoge', 'fuga'],
Expand Down
50 changes: 50 additions & 0 deletions test/test_log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,56 @@ def test_dup
assert_equal(Fluent::Log::LEVEL_TRACE, log2.level)
end

def test_format_json
logdev = @log_device
logger = ServerEngine::DaemonLogger.new(logdev)
log = Fluent::Log.new(logger)
log.format = :json
log.level = Fluent::Log::LEVEL_TRACE
log.trace "trace log"
log.debug "debug log"
log.info "info log"
log.warn "warn log"
log.error "error log"
log.fatal "fatal log"
expected = [
"#{@timestamp_str} [trace]: trace log\n",
"#{@timestamp_str} [debug]: debug log\n",
"#{@timestamp_str} [info]: info log\n",
"#{@timestamp_str} [warn]: warn log\n",
"#{@timestamp_str} [error]: error log\n",
"#{@timestamp_str} [fatal]: fatal log\n"
]
assert_equal(expected, log.out.logs.map { |l|
r = JSON.parse(l)
"#{r['time']} [#{r['level']}]: #{r['message']}\n"
})
end

def test_time_format
logdev = @log_device
logger = ServerEngine::DaemonLogger.new(logdev)
log = Fluent::Log.new(logger)
log.time_format = "%Y"
log.level = Fluent::Log::LEVEL_TRACE
log.trace "trace log"
log.debug "debug log"
log.info "info log"
log.warn "warn log"
log.error "error log"
log.fatal "fatal log"
timestamp_str = @timestamp.strftime("%Y")
expected = [
"#{timestamp_str} [trace]: trace log\n",
"#{timestamp_str} [debug]: debug log\n",
"#{timestamp_str} [info]: info log\n",
"#{timestamp_str} [warn]: warn log\n",
"#{timestamp_str} [error]: error log\n",
"#{timestamp_str} [fatal]: fatal log\n"
]
assert_equal(expected, log.out.logs)
end

def test_disable_events
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
Expand Down
6 changes: 6 additions & 0 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def test_system_config
process_name "process_name"
log_level info
root_dir #{TMP_ROOT_DIR}
<log>
format json
time_format %Y
</log>
</system>
EOC
conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
Expand All @@ -145,6 +149,8 @@ def test_system_config
assert_equal "process_name", sys_conf.process_name
assert_equal 2, sys_conf.log_level
assert_equal TMP_ROOT_DIR, sys_conf.root_dir
assert_equal :json, sys_conf.log.format
assert_equal '%Y', sys_conf.log.time_format
end

def test_main_process_signal_handlers
Expand Down

0 comments on commit 6e89d3f

Please sign in to comment.