From b1c376158ac9964eafe38257a0799f9cbd0182a8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 28 Jun 2021 18:55:15 +0900 Subject: [PATCH 01/18] input: metrics: Use metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/input.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index ef55c8ba0e..c686121863 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -27,12 +27,12 @@ class Input < Base include PluginLoggerMixin include PluginHelper::Mixin - helpers_internal :event_emitter + helpers_internal :event_emitter, :metrics def initialize super - @emit_records = 0 - @emit_size = 0 + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") @counter_mutex = Mutex.new @enable_size_metrics = false end @@ -45,8 +45,8 @@ def configure(conf) def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'input' => stats } @@ -54,8 +54,8 @@ def statistics def metric_callback(es) @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end From 59dbc161ae0f20ac06d1713bfe1ffef89f9a59c8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jun 2021 11:35:09 +0900 Subject: [PATCH 02/18] filter: Use metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/filter.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index 71310762e4..367d876d72 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -28,15 +28,15 @@ class Filter < Base include PluginLoggerMixin include PluginHelper::Mixin - helpers_internal :event_emitter + helpers_internal :event_emitter, :metrics attr_reader :has_filter_with_time def initialize super @has_filter_with_time = has_filter_with_time? - @emit_records = 0 - @emit_size = 0 + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") @counter_mutex = Mutex.new @enable_size_metrics = false end @@ -49,8 +49,8 @@ def configure(conf) def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'filter' => stats } @@ -58,8 +58,8 @@ def statistics def measure_metrics(es) @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end From d5f8cc4504f7dd55cb68f9a33b486a54b72e5f63 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jun 2021 17:47:27 +0900 Subject: [PATCH 03/18] multi_output: Implement metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/multi_output.rb | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 966f89f7e1..13e9fd05b8 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -27,6 +27,7 @@ class MultiOutput < Base include PluginHelper::Mixin # for event_emitter helpers :event_emitter # to get router from agent, which will be supplied to child plugins + helpers_internal :metrics config_section :store, param_name: :stores, multi: true, required: true do config_argument :arg, :string, default: '' @@ -44,15 +45,24 @@ def initialize @outputs = [] @outputs_statically_created = false - @counter_mutex = Mutex.new # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") # @write_count = 0 # @rollback_count = 0 end + def statistics + stats = { + 'num_errors' => @num_errors_metrics.get, + 'emit_records' => @emit_records_metrics.get, + 'emit_count' => @emit_count_metrics.get, + } + + { 'multi_output' => stats } + end + def multi_output? true end @@ -143,12 +153,12 @@ def terminate end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize{ @emit_records += es.size } + @emit_records_metrics.add(es.size) rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end From 6dfe686d64c33b16d9daadf9ef1291dff295d7ab Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jun 2021 18:21:24 +0900 Subject: [PATCH 04/18] multi_output: Add emit_size metrics Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/multi_output.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 13e9fd05b8..565e251707 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -45,12 +45,15 @@ def initialize @outputs = [] @outputs_statically_created = false + @counter_mutex = Mutex.new # TODO: well organized counters @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") # @write_count = 0 # @rollback_count = 0 + @enable_size_metrics = false end def statistics @@ -58,6 +61,7 @@ def statistics 'num_errors' => @num_errors_metrics.get, 'emit_records' => @emit_records_metrics.get, 'emit_count' => @emit_count_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'multi_output' => stats } @@ -70,6 +74,7 @@ def multi_output? def configure(conf) super + @enable_size_metrics = !!system_config.enable_size_metrics @stores.each do |store| store_conf = store.corresponding_config_element type = store_conf['@type'] @@ -156,7 +161,10 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @emit_records_metrics.add(es.size) + @counter_mutex.synchronize do + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + @emit_records_metrics.add(es.size) + end rescue @num_errors_metrics.inc raise From 4a0335a43a131f5f8a428f7deff98c3942e6cf4e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jun 2021 18:26:25 +0900 Subject: [PATCH 05/18] bare_output: Implement metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 38 +++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 56b1034e61..d42fec2121 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -28,32 +28,54 @@ class BareOutput < Base # which cannot be implemented on MultiOutput. # E.g,: forest, config-expander + helpers_internal :metrics + include PluginId include PluginLoggerMixin include PluginHelper::Mixin - attr_reader :num_errors, :emit_count, :emit_records - def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + def initialize super @counter_mutex = Mutex.new # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") + end + + def statistics + stats = { + 'num_errors' => @num_errors_metrics.get, + 'emit_records' => @emit_records_metrics.get, + 'emit_count' => @emit_count_metrics.get, + } + + { 'bare_output' => stats } end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize{ @emit_records += es.size } + @emit_records_metrics.add(es.size) rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end From 18a83066a5f4ff1284658ab0409db4c0ed196f2a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 29 Jun 2021 18:28:34 +0900 Subject: [PATCH 06/18] bare_output: Add emit_size metrics Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index d42fec2121..e38edf4e5d 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -23,6 +23,8 @@ module Fluent module Plugin class BareOutput < Base + include PluginHelper::Mixin # for metrics + # DO NOT USE THIS plugin for normal output plugin. Use Output instead. # This output plugin base class is only for meta-output plugins # which cannot be implemented on MultiOutput. @@ -57,6 +59,8 @@ def initialize @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events") + @enable_size_metrics = !!system_config.enable_size_metrics end def statistics @@ -64,6 +68,7 @@ def statistics 'num_errors' => @num_errors_metrics.get, 'emit_records' => @emit_records_metrics.get, 'emit_count' => @emit_count_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'bare_output' => stats } @@ -73,7 +78,10 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @emit_records_metrics.add(es.size) + @counter_mutex.synchronize do + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + @emit_records_metrics.add(es.size) + end rescue @num_errors_metrics.inc raise From a910c36e4f5d67e3c690542400cd4d2bb5dbeba8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 30 Jun 2021 19:24:48 +0900 Subject: [PATCH 07/18] output: Implement metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/output.rb | 97 ++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index f89c97f610..5d26224077 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -37,7 +37,7 @@ class Output < Base include PluginHelper::Mixin include UniqueId::Mixin - helpers_internal :thread, :retry_state + helpers_internal :thread, :retry_state, :metrics CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{([-_.@$a-zA-Z0-9]+)\}/ @@ -164,7 +164,6 @@ def expired? end attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone - attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count # for tests attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag @@ -172,6 +171,26 @@ def expired? # output_enqueue_thread_waiting: for test of output.rb itself attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + + def write_count + @write_count_metrics.get + end + + def rollback_count + @rollback_count_metrics.get + end + def initialize super @counter_mutex = Mutex.new @@ -181,14 +200,14 @@ def initialize @primary_instance = nil # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 - @emit_size = 0 - @write_count = 0 - @rollback_count = 0 - @flush_time_count = 0 - @slow_flush_count = 0 + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") + @write_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") + @rollback_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") + @flush_time_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") + @slow_flush_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start @@ -801,21 +820,21 @@ def emit_events(tag, es) end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end def emit_buffered(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued?(nil, optimistic: true) @@ -823,7 +842,7 @@ def emit_buffered(tag, es) end rescue # TODO: separate number of errors into emit errors and write/flush errors - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end @@ -974,8 +993,8 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) @buffer.write(meta_and_data, enqueue: enqueue) end @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -994,8 +1013,8 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1022,8 +1041,8 @@ def handle_stream_simple(tag, es, enqueue: false) @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1062,7 +1081,7 @@ def rollback_write(chunk_id, update_retry: true) # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) @@ -1078,7 +1097,7 @@ def try_rollback_write while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1093,7 +1112,7 @@ def try_rollback_all until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1136,7 +1155,7 @@ def try_flush if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) - @counter_mutex.synchronize{ @write_count += 1 } + @write_count_metrics.inc @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in of primary ( don't get ) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) @@ -1148,7 +1167,7 @@ def try_flush chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id - @counter_mutex.synchronize{ @write_count += 1 } + @write_count_metrics.inc log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) @@ -1204,7 +1223,7 @@ def try_flush end if @buffer.takeback_chunk(chunk.unique_id) - @counter_mutex.synchronize { @rollback_count += 1 } + @rollback_count_metrics.inc end update_retry_state(chunk.unique_id, using_secondary, e) @@ -1235,9 +1254,9 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i - @counter_mutex.synchronize { @flush_time_count += elapsed_millsec } + @flush_time_count_metrics.add(elapsed_millsec) if elapsed_time > @slow_flush_log_threshold - @counter_mutex.synchronize { @slow_flush_count += 1 } + @slow_flush_count_metrics.inc log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end @@ -1245,7 +1264,7 @@ def check_slow_flush(start) def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @@ -1506,16 +1525,16 @@ def flush_thread_run(state) def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 - 'retry_count' => @num_errors, - 'emit_count' => @emit_count, - 'write_count' => @write_count, - 'rollback_count' => @rollback_count, - 'slow_flush_count' => @slow_flush_count, - 'flush_time_count' => @flush_time_count, + 'retry_count' => @num_errors_metrics.get, + 'emit_count' => @emit_count_metrics.get, + 'write_count' => @write_count_metrics.get, + 'rollback_count' => @rollback_count_metrics.get, + 'slow_flush_count' => @slow_flush_count_metrics.get, + 'flush_time_count' => @flush_time_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) From 0427dd4d9dc18e9b1cb593068304934a07287cd7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 30 Jun 2021 19:33:12 +0900 Subject: [PATCH 08/18] compat: output: Implement metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/compat/output.rb | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 20e7d4cc71..d08b4d405e 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -310,7 +310,7 @@ def configure(conf) # original implementation of v0.12 BufferedOutput def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super - @emit_count += 1 + @emit_count_metrics.inc data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush @@ -337,14 +337,14 @@ def format_stream(tag, es) # because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit - current_emit_count = @emit_count + current_emit_count = @emit_count_metrics.get size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure - @emit_count = current_emit_count + @emit_count_metrics.set(current_emit_count) self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically @@ -352,7 +352,10 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @counter_mutex.synchronize do + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + @emit_records_metrics.add(size) + end return [meta] end @@ -363,7 +366,10 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @counter_mutex.synchronize do + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + @emit_records_metrics.add(size) + end return [meta] end @@ -373,7 +379,10 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @counter_mutex.synchronize do + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + @emit_records_metrics.add(size) + end [meta] end From 282f15843ef5d421cb5f5a01d3f09124c419bbe8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 1 Jul 2021 15:48:52 +0900 Subject: [PATCH 09/18] in_monitor: Follow num_errors not to be instance variable on output Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_monitor_agent.rb | 2 +- test/plugin/test_in_monitor_agent.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 7b5364957c..f41c1a0bce 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -238,7 +238,7 @@ def start 'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size }, 'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys }, 'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size }, - 'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil }, + 'retry_count' => ->(){ respond_to?(:num_errors) ? num_errors : nil }, } def all_plugins diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index 739cb66a6a..92344b6e15 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -673,7 +673,7 @@ def get(uri, header = {}) "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "instance_variables" => {"id" => "null", "num_errors" => 0}, + "instance_variables" => {"id" => "null"}, "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, From 85b23cd5cb295755b22b92c285459d02719a8fe6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 8 Jul 2021 15:06:52 +0900 Subject: [PATCH 10/18] monitoring: Add metric fallback methods Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 4 ++++ lib/fluent/plugin/filter.rb | 8 ++++++++ lib/fluent/plugin/input.rb | 8 ++++++++ lib/fluent/plugin/multi_output.rb | 16 ++++++++++++++++ lib/fluent/plugin/output.rb | 4 ++++ test/plugin/test_bare_output.rb | 9 +++++++++ test/plugin/test_filter.rb | 7 +++++++ test/plugin/test_input.rb | 7 +++++++ test/plugin/test_multi_output.rb | 17 +++++++++++++++++ test/plugin/test_output.rb | 11 +++++++++++ 10 files changed, 91 insertions(+) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index e38edf4e5d..a2c968ef5c 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -48,6 +48,10 @@ def emit_count @emit_count_metrics.get end + def emit_size + @emit_size_metrics.get + end + def emit_records @emit_records_metrics.get end diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index 367d876d72..b440cf161a 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -47,6 +47,14 @@ def configure(conf) @enable_size_metrics = !!system_config.enable_size_metrics end + def emit_records + @emit_records_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + def statistics stats = { 'emit_records' => @emit_records_metrics.get, diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index c686121863..20125e8a68 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -43,6 +43,14 @@ def configure(conf) @enable_size_metrics = !!system_config.enable_size_metrics end + def emit_records + @emit_records_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + def statistics stats = { 'emit_records' => @emit_records_metrics.get, diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 565e251707..a1f0fd8c9e 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -56,6 +56,22 @@ def initialize @enable_size_metrics = false end + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + def statistics stats = { 'num_errors' => @num_errors_metrics.get, diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 5d26224077..74c22cff00 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -179,6 +179,10 @@ def emit_count @emit_count_metrics.get end + def emit_size + @emit_size_metrics.get + end + def emit_records @emit_records_metrics.get end diff --git a/test/plugin/test_bare_output.rb b/test/plugin/test_bare_output.rb index 3dab2b66c8..deb52ca5c9 100644 --- a/test/plugin/test_bare_output.rb +++ b/test/plugin/test_bare_output.rb @@ -95,6 +95,15 @@ class FluentPluginBareOutputTest::DummyPlugin2 < Fluent::Plugin::BareOutput end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + assert_equal 0, @p.num_errors + assert_equal 0, @p.emit_count + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'can get input event stream to write' do @p.configure(config_element('ROOT')) @p.start diff --git a/test/plugin/test_filter.rb b/test/plugin/test_filter.rb index d2e6791a43..f117c43664 100644 --- a/test/plugin/test_filter.rb +++ b/test/plugin/test_filter.rb @@ -165,6 +165,13 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'are available with multi worker configuration in default' do assert @p.multi_workers_ready? end diff --git a/test/plugin/test_input.rb b/test/plugin/test_input.rb index 4bd415d818..ea1e151f2d 100644 --- a/test/plugin/test_input.rb +++ b/test/plugin/test_input.rb @@ -85,6 +85,13 @@ class FluentPluginInputTest::DummyPlugin2 < Fluent::Plugin::Input end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'are not available with multi workers configuration in default' do assert_false @p.multi_workers_ready? end diff --git a/test/plugin/test_multi_output.rb b/test/plugin/test_multi_output.rb index e07d6353b7..ac841bebe8 100644 --- a/test/plugin/test_multi_output.rb +++ b/test/plugin/test_multi_output.rb @@ -176,5 +176,22 @@ def create_output(type=:multi) assert_equal 2, @i.events.size end + + test 'can use metrics plugins and fallback methods' do + conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' }, + [ + config_element('store', '', { 'type' => 'dummy_test_multi_output_1' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_2' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_3' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_4' }), + ] + ) + @i.configure(conf) + + assert_equal 0, @i.num_errors + assert_equal 0, @i.emit_count + assert_equal 0, @i.emit_size + assert_equal 0, @i.emit_records + end end end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 4ffba4a4f9..37c929d181 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -223,6 +223,17 @@ def waiting(seconds) assert @i.terminated? end + test 'can use metrics plugins and fallback methods' do + @i.configure(config_element()) + + assert_equal 0, @i.num_errors + assert_equal 0, @i.emit_count + assert_equal 0, @i.emit_size + assert_equal 0, @i.emit_records + assert_equal 0, @i.write_count + assert_equal 0, @i.rollback_count + end + data(:new_api => :chunk, :old_api => :metadata) test '#extract_placeholders does nothing if chunk key is not specified' do |api| From 4eb8ec0eb57fc847bd68f77e9b409bc4fa397a9c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 8 Jul 2021 17:25:59 +0900 Subject: [PATCH 11/18] metrics: monitoring: Create metrics instances after super class' configuring is done Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 9 +++ lib/fluent/plugin/filter.rb | 18 +++--- lib/fluent/plugin/input.rb | 18 +++--- lib/fluent/plugin/multi_output.rb | 13 ++-- lib/fluent/plugin/output.rb | 25 +++++--- test/test_plugin_classes.rb | 102 ++++++++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 28 deletions(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index a2c968ef5c..0b39b17d22 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -60,6 +60,15 @@ def initialize super @counter_mutex = Mutex.new # TODO: well organized counters + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil + end + + def configure(conf) + super + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index b440cf161a..816af78288 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -35,18 +35,12 @@ class Filter < Base def initialize super @has_filter_with_time = has_filter_with_time? - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") + @emit_records_metrics = nil + @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end - def configure(conf) - super - - @enable_size_metrics = !!system_config.enable_size_metrics - end - def emit_records @emit_records_metrics.get end @@ -55,6 +49,14 @@ def emit_size @emit_size_metrics.get end + def configure(conf) + super + + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") + @enable_size_metrics = !!system_config.enable_size_metrics + end + def statistics stats = { 'emit_records' => @emit_records_metrics.get, diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 20125e8a68..97f06853a7 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -31,18 +31,12 @@ class Input < Base def initialize super - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") + @emit_records_metrics = nil + @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end - def configure(conf) - super - - @enable_size_metrics = !!system_config.enable_size_metrics - end - def emit_records @emit_records_metrics.get end @@ -51,6 +45,14 @@ def emit_size @emit_size_metrics.get end + def configure(conf) + super + + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") + @enable_size_metrics = !!system_config.enable_size_metrics + end + def statistics stats = { 'emit_records' => @emit_records_metrics.get, diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index a1f0fd8c9e..df6ed1f0a4 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -47,10 +47,10 @@ def initialize @counter_mutex = Mutex.new # TODO: well organized counters - @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") - @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil # @write_count = 0 # @rollback_count = 0 @enable_size_metrics = false @@ -90,7 +90,12 @@ def multi_output? def configure(conf) super + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics + @stores.each do |store| store_conf = store.corresponding_config_element type = store_conf['@type'] diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 74c22cff00..716c06e8e6 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -204,14 +204,14 @@ def initialize @primary_instance = nil # TODO: well organized counters - @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") - @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") - @write_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") - @rollback_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") - @flush_time_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") - @slow_flush_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil + @write_count_metrics = nil + @rollback_count_metrics = nil + @flush_time_count_metrics = nil + @slow_flush_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start @@ -271,6 +271,15 @@ def configure(conf) super + @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") + @write_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") + @rollback_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") + @flush_time_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") + @slow_flush_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") + if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, " section is configured, but plugin '#{self.class}' doesn't support buffering" diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index 12879881c8..8af95aa20c 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -5,11 +5,78 @@ require 'fluent/plugin/filter' module FluentTest + class FluentTestCounterMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('test_counter', self) + + attr_reader :data + + def initialize + super + @data = 0 + end + def get + @data + end + def inc + @data +=1 + end + def add(value) + @data += value + end + def set(value) + @data = value + end + def close + @data = 0 + super + end + end + + class FluentTestGaugeMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('test_gauge', self) + + attr_reader :data + + def initialize + super + @data = 0 + end + def get + @data + end + def inc + @data += 1 + end + def dec + @data -=1 + end + def add(value) + @data += value + end + def sub(value) + @data -= value + end + def set(value) + @data = value + end + def close + @data = 0 + super + end + end + class FluentTestInput < ::Fluent::Plugin::Input ::Fluent::Plugin.register_input('test_in', self) attr_reader :started + def initialize + super + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + end + def start super @started = true @@ -28,6 +95,13 @@ class FluentTestGenInput < ::Fluent::Plugin::Input config_param :num, :integer, default: 10000 + def initialize + super + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + end + def start super @started = true @@ -49,6 +123,15 @@ class FluentTestOutput < ::Fluent::Plugin::Output def initialize super @events = Hash.new { |h, k| h[k] = [] } + # stub metrics instances + @num_errors_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_count_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new + @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new + @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :events @@ -168,6 +251,19 @@ def write(chunk) class FluentTestErrorOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_error', self) + def initialize + super + # stub metrics instances + @num_errors_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_count_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new + @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new + @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new + end + def format(tag, time, record) raise "emit error!" end @@ -184,6 +280,9 @@ def initialize(field = '__test__') super() @num = 0 @field = field + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :num @@ -213,6 +312,9 @@ def initialize(field = '__test__') super() @num = 0 @field = field + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :num From 6e300a8f85752ab5195c3b96d98d70ca8525964c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 9 Jul 2021 10:03:43 +0900 Subject: [PATCH 12/18] metrics: Use "fluentd" instead of "Fluentd" for namespace Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 8 ++++---- lib/fluent/plugin/filter.rb | 4 ++-- lib/fluent/plugin/input.rb | 4 ++-- lib/fluent/plugin/multi_output.rb | 8 ++++---- lib/fluent/plugin/output.rb | 16 ++++++++-------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 0b39b17d22..527d3e10a8 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -69,10 +69,10 @@ def initialize def configure(conf) super - @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") - @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events") + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index 816af78288..a2580c03d0 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -52,8 +52,8 @@ def emit_size def configure(conf) super - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 97f06853a7..3442387cdb 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -48,8 +48,8 @@ def emit_size def configure(conf) super - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index df6ed1f0a4..81c5cc800f 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -90,10 +90,10 @@ def multi_output? def configure(conf) super - @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") - @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics @stores.each do |store| diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 716c06e8e6..00ec6624ac 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -271,14 +271,14 @@ def configure(conf) super - @num_errors_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") - @emit_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") - @emit_records_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") - @emit_size_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") - @write_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") - @rollback_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") - @flush_time_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") - @slow_flush_count_metrics = metrics_create(namespace: "Fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") + @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") + @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") + @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") + @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) From 072be19acf92dbe5d6d826eafefd7abc824e74b8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 9 Jul 2021 11:24:28 +0900 Subject: [PATCH 13/18] multi_output: test: Fix failing test for warnings counts Signed-off-by: Hiroshi Hatake --- test/plugin/test_multi_output.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_multi_output.rb b/test/plugin/test_multi_output.rb index ac841bebe8..ac82e65138 100644 --- a/test/plugin/test_multi_output.rb +++ b/test/plugin/test_multi_output.rb @@ -146,8 +146,11 @@ def create_output(type=:multi) @i.configure(conf) assert_equal 4, @i.outputs.size + log_size_for_multi_output_itself = 4 + log_size_for_metrics_plugin_helper = 4 + expected_warn_log_size = log_size_for_multi_output_itself + log_size_for_metrics_plugin_helper logs = @i.log.out.logs - assert{ logs.select{|log| log.include?('[warn]') && log.include?("'type' is deprecated parameter name. use '@type' instead.") }.size == 4 } + assert{ logs.select{|log| log.include?('[warn]') && log.include?("'type' is deprecated parameter name. use '@type' instead.") }.size == expected_warn_log_size } end test '#emit_events calls #process always' do From 8829124aeaa5e63f0c7ce724c89b3c3e743de2ac Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Jul 2021 18:28:17 +0900 Subject: [PATCH 14/18] metrics: Add metrics plugin instance existences checking assertions Signed-off-by: Hiroshi Hatake --- test/plugin/test_bare_output.rb | 4 ++++ test/plugin/test_filter.rb | 4 ++++ test/plugin/test_input.rb | 4 ++++ test/plugin/test_multi_output.rb | 4 ++++ test/plugin/test_output.rb | 5 +++++ 5 files changed, 21 insertions(+) diff --git a/test/plugin/test_bare_output.rb b/test/plugin/test_bare_output.rb index deb52ca5c9..a8d901d056 100644 --- a/test/plugin/test_bare_output.rb +++ b/test/plugin/test_bare_output.rb @@ -98,6 +98,10 @@ class FluentPluginBareOutputTest::DummyPlugin2 < Fluent::Plugin::BareOutput test 'can use metrics plugins and fallback methods' do @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + assert_equal 0, @p.num_errors assert_equal 0, @p.emit_count assert_equal 0, @p.emit_size diff --git a/test/plugin/test_filter.rb b/test/plugin/test_filter.rb index f117c43664..052d68b7a7 100644 --- a/test/plugin/test_filter.rb +++ b/test/plugin/test_filter.rb @@ -168,6 +168,10 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter test 'can use metrics plugins and fallback methods' do @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + %w[emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + assert_equal 0, @p.emit_size assert_equal 0, @p.emit_records end diff --git a/test/plugin/test_input.rb b/test/plugin/test_input.rb index ea1e151f2d..971bdaa103 100644 --- a/test/plugin/test_input.rb +++ b/test/plugin/test_input.rb @@ -88,6 +88,10 @@ class FluentPluginInputTest::DummyPlugin2 < Fluent::Plugin::Input test 'can use metrics plugins and fallback methods' do @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + %w[emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + assert_equal 0, @p.emit_size assert_equal 0, @p.emit_records end diff --git a/test/plugin/test_multi_output.rb b/test/plugin/test_multi_output.rb index ac82e65138..2351d1385f 100644 --- a/test/plugin/test_multi_output.rb +++ b/test/plugin/test_multi_output.rb @@ -191,6 +191,10 @@ def create_output(type=:multi) ) @i.configure(conf) + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + assert_equal 0, @i.num_errors assert_equal 0, @i.emit_count assert_equal 0, @i.emit_size diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 37c929d181..c25c7b1afb 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -226,6 +226,11 @@ def waiting(seconds) test 'can use metrics plugins and fallback methods' do @i.configure(config_element()) + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics + write_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name| + assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + assert_equal 0, @i.num_errors assert_equal 0, @i.emit_count assert_equal 0, @i.emit_size From 544ded843468d2fd17cf256d946672c6e806b534 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Jul 2021 19:02:17 +0900 Subject: [PATCH 15/18] input: filter: bare_output: multi_output: output: Don't use monitor lock if needn't Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 8 ++++++-- lib/fluent/plugin/filter.rb | 8 ++++++-- lib/fluent/plugin/input.rb | 8 ++++++-- lib/fluent/plugin/multi_output.rb | 8 ++++++-- lib/fluent/plugin/output.rb | 32 +++++++++++++++++++++++-------- 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 527d3e10a8..e5858bbfb3 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -91,8 +91,12 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) end rescue diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index a2580c03d0..a03b5b6c85 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -67,9 +67,13 @@ def statistics end def measure_metrics(es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 3442387cdb..0768fa8005 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -63,9 +63,13 @@ def statistics end def metric_callback(es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 81c5cc800f..21547ef79c 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -182,8 +182,12 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) end rescue diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 00ec6624ac..f61d23c1b7 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -836,9 +836,13 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end rescue @num_errors_metrics.inc @@ -1005,9 +1009,13 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1025,9 +1033,13 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1053,9 +1065,13 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end From e9b3d279cc6cdad371e04b7ce7904892d2b40795 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Jul 2021 19:10:08 +0900 Subject: [PATCH 16/18] compat: output: Avoid to use monitor lock if needn't Signed-off-by: Hiroshi Hatake --- lib/fluent/compat/output.rb | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index d08b4d405e..3d37aebcba 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -352,9 +352,13 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics - @emit_records_metrics.add(size) + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else + @emit_records_metrics.add(es.size) end return [meta] end @@ -366,9 +370,13 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics - @emit_records_metrics.add(size) + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else + @emit_records_metrics.add(es.size) end return [meta] end @@ -379,9 +387,13 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics - @emit_records_metrics.add(size) + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else + @emit_records_metrics.add(es.size) end [meta] end From daf62f86cbd790800663330883c643f5c2aa8cd3 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sat, 31 Jul 2021 22:39:04 +0900 Subject: [PATCH 17/18] test_in_tail: Make sure to capture $log output Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 4532557059..97b900ec6a 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2179,8 +2179,9 @@ def test_ENOENT_error_after_setup_watcher assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::ENOENT. Drop tail watcher for now.\n") }) + ensure + d.instance_shutdown if d && d.instance end def test_EACCES_error_after_setup_watcher @@ -2203,10 +2204,10 @@ def test_EACCES_error_after_setup_watcher assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::EACCES. Drop tail watcher for now.\n") }) end ensure + d.instance_shutdown if d && d.instance if File.exist?("#{TMP_DIR}/noaccess") FileUtils.chmod(0755, "#{TMP_DIR}/noaccess") FileUtils.rm_rf("#{TMP_DIR}/noaccess") @@ -2226,8 +2227,9 @@ def test_EACCES assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("expand_paths: stat() for #{path} failed with Errno::EACCES. Skip file.\n") }) + ensure + d.instance_shutdown if d && d.instance end def test_shutdown_timeout From aa79a4cd16bdab3190dd8fe8f5e2dc92a69c72ea Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Sun, 1 Aug 2021 14:55:35 +0900 Subject: [PATCH 18/18] plugin: metrics: Simplify metrics code Signed-off-by: Hiroshi Hatake --- lib/fluent/compat/output.rb | 30 +++++------------------ lib/fluent/plugin/bare_output.rb | 10 ++------ lib/fluent/plugin/filter.rb | 10 ++------ lib/fluent/plugin/input.rb | 10 ++------ lib/fluent/plugin/multi_output.rb | 10 ++------ lib/fluent/plugin/output.rb | 40 +++++++------------------------ 6 files changed, 22 insertions(+), 88 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 3d37aebcba..f10105b7ea 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -352,14 +352,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [meta] end @@ -370,14 +364,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [meta] end @@ -387,14 +375,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics [meta] end diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index e5858bbfb3..57de0c9741 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -91,14 +91,8 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index a03b5b6c85..c2956a98b3 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -67,14 +67,8 @@ def statistics end def measure_metrics(es) - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end def filter(tag, time, record) diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 0768fa8005..7a6909f7a9 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -63,14 +63,8 @@ def statistics end def metric_callback(es) - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end def multi_workers_ready? diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 21547ef79c..ce900fa5cc 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -182,14 +182,8 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index f61d23c1b7..23fd29ab7e 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -836,14 +836,8 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise @@ -1009,14 +1003,8 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end @@ -1033,14 +1021,8 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end @@ -1065,14 +1047,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end - if @enable_size_metrics - @counter_mutex.synchronize do - @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) - end - else - @emit_records_metrics.add(es.size) - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end