Skip to content

Commit

Permalink
plugin: metrics: Simplify metrics code
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed Aug 1, 2021
1 parent daf62f8 commit aa79a4c
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 88 deletions.
30 changes: 6 additions & 24 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -370,14 +364,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -387,14 +375,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
[meta]
end

Expand Down
10 changes: 2 additions & 8 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,8 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@num_errors_metrics.inc
raise
Expand Down
10 changes: 2 additions & 8 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,8 @@ def statistics
end

def measure_metrics(es)
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def filter(tag, time, record)
Expand Down
10 changes: 2 additions & 8 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,8 @@ def statistics
end

def metric_callback(es)
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def multi_workers_ready?
Expand Down
10 changes: 2 additions & 8 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,8 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@num_errors_metrics.inc
raise
Expand Down
40 changes: 8 additions & 32 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,8 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@num_errors_metrics.inc
raise
Expand Down Expand Up @@ -1009,14 +1003,8 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
true
end

Expand All @@ -1033,14 +1021,8 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
true
end

Expand All @@ -1065,14 +1047,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: format_proc, enqueue: enqueue)
end
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
true
end

Expand Down

0 comments on commit aa79a4c

Please sign in to comment.