Skip to content

Commit

Permalink
Merge pull request #3468 from cosmo0920/input-output-metrics
Browse files Browse the repository at this point in the history
Measure emitting records counts and their sizes for input and output metrics and used it in in_monitor_agent (revised)
  • Loading branch information
cosmo0920 authored Jul 27, 2021
2 parents ecb6a86 + 672ffb2 commit e2291d9
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 12 deletions.
8 changes: 8 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@
opts[:strict_config_value] = b
}

op.on('--enable-input-metrics', "Enable input plugin metrics on fluentd", TrueClass) {|b|
opts[:enable_input_metrics] = b
}

op.on('--enable-size-metrics', "Enable plugin record size metrics on fluentd", TrueClass) {|b|
opts[:enable_size_metrics] = b
}

op.on('-v', '--verbose', "increase verbose level (-v: debug, -vv: trace)", TrueClass) {|b|
if b
opts[:log_level] = [opts[:log_level] - 1, Fluent::Log::LEVEL_TRACE].max
Expand Down
21 changes: 21 additions & 0 deletions lib/fluent/event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def initialize(default_collector, emit_error_handler)
@match_cache = MatchCache.new
@default_collector = default_collector
@emit_error_handler = emit_error_handler
@metric_callbacks = {}
@caller_plugin_id = nil
end

attr_accessor :default_collector
Expand Down Expand Up @@ -83,6 +85,22 @@ def add_rule(pattern, collector)
@match_rules << Rule.new(pattern, collector)
end

def add_metric_callbacks(caller_plugin_id, callback)
@metric_callbacks[caller_plugin_id] = callback
end

def caller_plugin_id=(caller_plugin_id)
@caller_plugin_id = caller_plugin_id
end

def find_callback
if @caller_plugin_id
@metric_callbacks[@caller_plugin_id]
else
nil
end
end

def emit(tag, time, record)
unless record.nil?
emit_stream(tag, OneEventStream.new(time, record))
Expand All @@ -95,6 +113,9 @@ def emit_array(tag, array)

def emit_stream(tag, es)
match(tag).emit_events(tag, es)
if callback = find_callback
callback.call(es)
end
rescue => e
@emit_error_handler.handle_emits_error(tag, es, e)
end
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def get_monitor_info(pe, opts={})
}

if pe.respond_to?(:statistics)
obj.merge!(pe.statistics['output'] || {})
obj.merge!(pe.statistics.dig('output') || {})
obj.merge!(pe.statistics.dig('input') || {})
end

obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] && pe.instance_variable_defined?(:@retry)
Expand Down
30 changes: 30 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,36 @@ class Input < Base

helpers_internal :event_emitter

def initialize
super
@emit_records = 0
@emit_size = 0
@counter_mutex = Mutex.new
@enable_size_metrics = false
end

def configure(conf)
super

@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
}

{ 'input' => stats }
end

def metric_callback(es)
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
end

def multi_workers_ready?
false
end
Expand Down
25 changes: 21 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,12 @@ def initialize
@num_errors = 0
@emit_count = 0
@emit_records = 0
@emit_size = 0
@write_count = 0
@rollback_count = 0
@flush_time_count = 0
@slow_flush_count = 0
@enable_size_metrics = false

# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
if implement?(:synchronous)
Expand Down Expand Up @@ -271,6 +273,8 @@ def configure(conf)
@buffering = true
end
end
# Enable to update record size metrics or not
@enable_size_metrics = !!system_config.enable_size_metrics

if @as_secondary
if !@buffering && !@buffering.nil?
Expand Down Expand Up @@ -800,7 +804,10 @@ def emit_sync(tag, es)
@counter_mutex.synchronize{ @emit_count += 1 }
begin
process(tag, es)
@counter_mutex.synchronize{ @emit_records += es.size }
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
rescue
@counter_mutex.synchronize{ @num_errors += 1 }
raise
Expand Down Expand Up @@ -966,7 +973,10 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += records }
@counter_mutex.synchronize do
@emit_records += records
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
true
end

Expand All @@ -983,7 +993,10 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += records }
@counter_mutex.synchronize do
@emit_records += records
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
true
end

Expand All @@ -1008,7 +1021,10 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: format_proc, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += records }
@counter_mutex.synchronize do
@emit_records += records
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
true
end

Expand Down Expand Up @@ -1491,6 +1507,7 @@ def flush_thread_run(state)
def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
# Respect original name
# https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284
'retry_count' => @num_errors,
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def router
if @_event_emitter_lazy_init
@router = @primary_instance.router
end
if @router.respond_to?(:caller_plugin_id=)
@router.caller_plugin_id = self.plugin_id
end
@router
end

Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ def initialize(log:, system_config: SystemConfig.new)
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
@without_source = false
@enable_input_metrics = false

suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@enable_input_metrics = !!system_config.enable_input_metrics
end

attr_reader :inputs
Expand Down Expand Up @@ -315,6 +317,9 @@ def add_source(type, conf)
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.configure(conf)
if @enable_input_metrics
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
end
@inputs << input

input
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ def self.default_options
suppress_repeated_stacktrace: true,
ignore_repeated_log_interval: nil,
without_source: nil,
enable_input_metrics: nil,
enable_size_metrics: nil,
use_v1_config: true,
strict_config_value: nil,
supervise: true,
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SystemConfig
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value, :enable_msgpack_time_support, :disable_shared_socket,
:metrics
:metrics, :enable_input_metrics, :enable_size_metrics
]

config_param :workers, :integer, default: 1
Expand All @@ -47,6 +47,8 @@ class SystemConfig
config_param :strict_config_value, :bool, default: nil
config_param :enable_msgpack_time_support, :bool, default: nil
config_param :disable_shared_socket, :bool, default: nil
config_param :enable_input_metrics, :bool, default: nil
config_param :enable_size_metrics, :bool, default: nil
config_param :file_permission, default: nil do |v|
v.to_i(8)
end
Expand Down
6 changes: 6 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def initialize(**opt)
log_event_label: nil,
log_event_verbose: nil,
without_source: nil,
enable_input_metrics: nil,
enable_size_metrics: nil,
emit_error_log_interval: nil,
file_permission: nil,
dir_permission: nil,
Expand Down Expand Up @@ -77,6 +79,8 @@ def parse_text(text)
assert_nil(sc.emit_error_log_interval)
assert_nil(sc.suppress_config_dump)
assert_nil(sc.without_source)
assert_nil(sc.enable_input_metrics)
assert_nil(sc.enable_size_metrics)
assert_nil(sc.enable_msgpack_time_support)
assert_equal(:text, sc.log.format)
assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format)
Expand All @@ -93,6 +97,8 @@ def parse_text(text)
'without_source' => ['without_source', true],
'strict_config_value' => ['strict_config_value', true],
'enable_msgpack_time_support' => ['enable_msgpack_time_support', true],
'enable_input_metrics' => ['enable_input_metrics', true],
'enable_size_metrics' => ['enable_size_metrics', true],
)
test "accepts parameters" do |(k, v)|
conf = parse_text(<<-EOS)
Expand Down
Loading

0 comments on commit e2291d9

Please sign in to comment.