From 78d3002f82dbcb9f9993b17b148f16e3e7743cd6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 23 Jun 2021 17:30:47 +0900 Subject: [PATCH 1/3] Implement metrics plugin mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin.rb | 11 +- lib/fluent/plugin/metrics.rb | 119 +++++++++++ lib/fluent/plugin/metrics_local.rb | 96 +++++++++ lib/fluent/plugin_helper.rb | 1 + lib/fluent/plugin_helper/metrics.rb | 70 +++++++ lib/fluent/system_config.rb | 8 +- test/plugin/test_metrics.rb | 294 ++++++++++++++++++++++++++++ test/plugin/test_metrics_local.rb | 96 +++++++++ 8 files changed, 693 insertions(+), 2 deletions(-) create mode 100644 lib/fluent/plugin/metrics.rb create mode 100644 lib/fluent/plugin/metrics_local.rb create mode 100644 lib/fluent/plugin_helper/metrics.rb create mode 100644 test/plugin/test_metrics.rb create mode 100644 test/plugin/test_metrics_local.rb diff --git a/lib/fluent/plugin.rb b/lib/fluent/plugin.rb index 852f085950..a5c4592482 100644 --- a/lib/fluent/plugin.rb +++ b/lib/fluent/plugin.rb @@ -36,8 +36,9 @@ module Plugin FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_', dir_search_prefix: 'formatter_') STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_', dir_search_prefix: 'storage_') SD_REGISTRY = Registry.new(:sd, 'fluent/plugin/sd_', dir_search_prefix: 'sd_') + METRICS_REGISTRY = Registry.new(:metrics, 'fluent/plugin/metrics_', dir_search_prefix: 'metrics_') - REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY] + REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY, METRICS_REGISTRY] def self.register_input(type, klass) register_impl('input', INPUT_REGISTRY, type, klass) @@ -59,6 +60,10 @@ def self.register_sd(type, klass) register_impl('sd', SD_REGISTRY, type, klass) end + def self.register_metrics(type, klass) + register_impl('metrics', METRICS_REGISTRY, type, klass) + end + def self.register_parser(type, klass_or_proc) if klass_or_proc.is_a?(Regexp) # This usage is not recommended for new API @@ -121,6 +126,10 @@ def self.new_sd(type, parent: nil) new_impl('sd', SD_REGISTRY, type, parent) end + def self.new_metrics(type, parent: nil) + new_impl('metrics', METRICS_REGISTRY, type, parent) + end + class << self # This should be defined for fluent-plugin-config-formatter type arguments. alias_method :new_service_discovery, :new_sd diff --git a/lib/fluent/plugin/metrics.rb b/lib/fluent/plugin/metrics.rb new file mode 100644 index 0000000000..3efb7077e7 --- /dev/null +++ b/lib/fluent/plugin/metrics.rb @@ -0,0 +1,119 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'socket' + +require 'fluent/plugin/base' + +require 'fluent/log' +require 'fluent/unique_id' +require 'fluent/plugin_id' + +module Fluent + module Plugin + class Metrics < Base + include PluginId + include PluginLoggerMixin + include UniqueId::Mixin + + DEFAULT_TYPE = 'local' + + configured_in :metrics + + config_param :default_labels, :hash, default: {agent: "Fluentd", hostname: "#{Socket.gethostname}"} + config_param :labels, :hash, default: {} + + attr_reader :use_gauge_metric + attr_reader :has_methods_for_gauge, :has_methods_for_counter + + def initialize + super + + @has_methods_for_counter = false + @has_methods_for_gauge = false + @use_gauge_metric = false + end + + def configure(conf) + super + + if use_gauge_metric + @has_methods_for_gauge = has_methods_for_gauge? + else + @has_methods_for_counter = has_methods_for_counter? + end + end + + # Some metrics should be counted by gauge. + # ref: https://prometheus.io/docs/concepts/metric_types/#gauge + def use_gauge_metric=(use_gauge_metric=false) + @use_gauge_metric = use_gauge_metric + end + + def create(namespace:, subsystem:,name:,help_text:,labels: {}) + # This API is for cmetrics type. + end + + def get(key) + raise NotImplementedError, "Implement this method in child class" + end + + def inc(key) + raise NotImplementedError, "Implement this method in child class" + end + + def dec(key) + raise NotImplementedError, "Implement this method in child class" + end + + def add(key, value) + raise NotImplementedError, "Implement this method in child class" + end + + def sub(key, value) + raise NotImplementedError, "Implement this method in child class" + end + + def set(key, value) + raise NotImplementedError, "Implement this method in child class" + end + + private + + def has_methods_for_counter? + implemented_methods = self.class.instance_methods(false) + + if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} && + [:set].all?{|e| self.class.method_defined?(e)} + true + else + raise "BUG: metrics plugin on counter mode MUST implement `get`, `inc`, `add` methods. And aliased `set` methods should be aliased from another method" + end + end + + def has_methods_for_gauge? + implemented_methods = self.class.instance_methods(false) + + if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} && + [:set, :dec, :sub].all?{|e| self.class.method_defined?(e)} + true + else + raise "BUG: metrics plugin on gauge mode MUST implement `get`, `inc`, and `add` methods. And `dec`, `sub`, and `set` methods should be aliased from other methods" + end + end + end + end +end diff --git a/lib/fluent/plugin/metrics_local.rb b/lib/fluent/plugin/metrics_local.rb new file mode 100644 index 0000000000..78f19a04fd --- /dev/null +++ b/lib/fluent/plugin/metrics_local.rb @@ -0,0 +1,96 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/plugin/metrics' + +module Fluent + module Plugin + class LocalMetrics < Metrics + Fluent::Plugin.register_metrics('local', self) + + def initialize + super + @store = Hash.new(0) + @monitor = Monitor.new + end + + def configure(conf) + super + + if use_gauge_metric + class << self + alias_method :dec, :dec_gauge + alias_method :set, :set_gauge + alias_method :sub, :sub_gauge + end + else + class << self + alias_method :set, :set_counter + end + end + end + + def multi_workers_ready? + true + end + + def get(key) + @monitor.synchronize do + @store[key.to_s] + end + end + + def inc(key) + @monitor.synchronize do + @store[key.to_s] += 1 + end + end + + def dec_gauge(key) + @monitor.synchronize do + @store[key.to_s] -= 1 + end + end + + def add(key, value) + @monitor.synchronize do + @store[key.to_s] += value + end + end + + def sub_gauge(key, value) + @monitor.synchronize do + @store[key.to_s] -= value + end + end + + def set_counter(key, value) + return if @store[key.to_s] > value + + @monitor.synchronize do + @store[key.to_s] = value + end + end + + def set_gauge(key, value) + @monitor.synchronize do + @store[key.to_s] = value + end + end + end + end +end diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 1611db582d..1876ad9d2b 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -32,6 +32,7 @@ require 'fluent/plugin_helper/record_accessor' require 'fluent/plugin_helper/compat_parameters' require 'fluent/plugin_helper/service_discovery' +require 'fluent/plugin_helper/metrics' module Fluent module PluginHelper diff --git a/lib/fluent/plugin_helper/metrics.rb b/lib/fluent/plugin_helper/metrics.rb new file mode 100644 index 0000000000..07af4cc490 --- /dev/null +++ b/lib/fluent/plugin_helper/metrics.rb @@ -0,0 +1,70 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'forwardable' + +require 'fluent/plugin' +require 'fluent/plugin/metrics' +require 'fluent/plugin_helper/timer' +require 'fluent/config/element' +require 'fluent/configurable' +require 'fluent/system_config' + +module Fluent + module PluginHelper + module Metrics + include Fluent::SystemConfig::Mixin + + def initialize + super + @_metrics = {} # usage => metrics_state + end + + def configure(conf) + super + end + + def metrics_create(namespace: "fluentd", subsystem: "metrics", name:, help_text:, labels: {}, prefer_gauge: false) + metrics = if system_config.metrics + Fluent::Plugin.new_metrics(system_config.metrics[:@type], parent: self) + else + Fluent::Plugin.new_metrics(Fluent::Plugin::Metrics::DEFAULT_TYPE, parent: self) + end + config = if system_config.metrics + system_config.metrics.corresponding_config_element + else + Fluent::Config::Element.new('metrics', '', {'@type' => Fluent::Plugin::Metrics::DEFAULT_TYPE}, []) + end + metrics.use_gauge_metric = prefer_gauge + metrics.configure(config) + # For multi workers environment, cmetrics should be distinguish with static labels. + if Fluent::Engine.system_config.workers > 1 + labels.merge!(worker_id: fluentd_worker_id.to_s) + end + metrics.create(namespace: namespace, subsystem: subsystem, name: name, help_text: help_text, labels: labels) + + @_metrics["#{self.plugin_id}_#{namespace}_#{subsystem}_#{name}"] = metrics + + metrics + end + + def terminate + @_metrics = {} + super + end + end + end +end diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index b322b4bb60..1f61676dac 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -27,7 +27,8 @@ class SystemConfig :log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval, :without_source, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, - :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket + :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, + :metrics ] config_param :workers, :integer, default: 1 @@ -93,6 +94,11 @@ class SystemConfig config_param :timeout, :time, default: nil end + config_section :metrics, multi: false do + config_param :@type, :string, default: "local" + config_param :labels, :hash, default: {} + end + def self.create(conf, strict_config_value=false) systems = conf.elements(name: 'system') return SystemConfig.new if systems.empty? diff --git a/test/plugin/test_metrics.rb b/test/plugin/test_metrics.rb new file mode 100644 index 0000000000..c0ef281e91 --- /dev/null +++ b/test/plugin/test_metrics.rb @@ -0,0 +1,294 @@ +require_relative '../helper' +require 'fluent/plugin/metrics' +require 'fluent/plugin/base' +require 'fluent/system_config' + +class BareMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('bare', self) + + private + + # Just override for tests. + def has_methods_for_counter? + false + end +end + +class BasicCounterMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('example', self) + + attr_reader :data + + def initialize + super + @data = Hash.new(0) + end + def get(key) + @data[key.to_s] + end + def inc(key) + @data[key.to_s] +=1 + end + def add(key, value) + @data[key.to_s] += value + end + def set(key, value) + @data[key.to_s] = value + end + def close + @data = {} + super + end +end + +class AliasedCounterMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('example', self) + + attr_reader :data + + def initialize + super + @data = Hash.new(0) + end + def configure(conf) + super + class << self + alias_method :set, :set_counter + end + end + def get(key) + @data[key.to_s] + end + def inc(key) + @data[key.to_s] +=1 + end + def add(key, value) + @data[key.to_s] += value + end + def set_counter(key, value) + @data[key.to_s] = value + end + def close + @data = {} + super + end +end + +class BasicGaugeMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('example', self) + + attr_reader :data + + def initialize + super + @data = Hash.new(0) + end + def get(key) + @data[key.to_s] + end + def inc(key) + @data[key.to_s] +=1 + end + def dec(key) + @data[key.to_s] -=1 + end + def add(key, value) + @data[key.to_s] += value + end + def sub(key, value) + @data[key.to_s] -= value + end + def set(key, value) + @data[key.to_s] = value + end + def close + @data = {} + super + end +end + +class AliasedGaugeMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('example', self) + + attr_reader :data + + def initialize + super + @data = Hash.new(0) + end + def configure(conf) + super + class << self + alias_method :dec, :dec_gauge + alias_method :set, :set_gauge + alias_method :sub, :sub_gauge + end + end + def get(key) + @data[key.to_s] + end + def inc(key) + @data[key.to_s] +=1 + end + def dec_gauge(key) + @data[key.to_s] -=1 + end + def add(key, value) + @data[key.to_s] += value + end + def sub_gauge(key, value) + @data[key.to_s] -= value + end + def set_gauge(key, value) + @data[key.to_s] = value + end + def close + @data = {} + super + end +end + +class StorageTest < Test::Unit::TestCase + sub_test_case 'BareMetrics' do + setup do + @m = BareMetrics.new + @m.configure(config_element()) + end + + test 'is configured with plugin information and system config' do + m = BareMetrics.new + m.configure(config_element('metrics', '', {})) + + assert_false m.use_gauge_metric + assert_false m.has_methods_for_counter + assert_false m.has_methods_for_gauge + end + + test 'all bare operations are not defined yet' do + assert_raise NotImplementedError do + @m.get('key') + end + assert_raise NotImplementedError do + @m.inc('key') + end + assert_raise NotImplementedError do + @m.dec('key') + end + assert_raise NotImplementedError do + @m.add('key', 10) + end + assert_raise NotImplementedError do + @m.sub('key', 11) + end + assert_raise NotImplementedError do + @m.set('key', 123) + end + end + end + + sub_test_case 'BasicCounterMetric' do + setup do + @m = BasicCounterMetrics.new + @m.configure(config_element('metrics', '', {'@id' => '1'})) + end + + test 'all basic counter operations work well' do + assert_true @m.has_methods_for_counter + assert_false @m.has_methods_for_gauge + + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + assert_raise NotImplementedError do + @m.dec('key') + end + + @m.set('key', 100) + assert_equal 100, @m.get('key') + assert_raise NotImplementedError do + @m.sub('key', 11) + end + end + end + + sub_test_case 'AliasedCounterMetric' do + setup do + @m = AliasedCounterMetrics.new + @m.configure(config_element('metrics', '', {})) + end + + test 'all aliased counter operations work well' do + assert_true @m.has_methods_for_counter + assert_false @m.has_methods_for_gauge + + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + assert_raise NotImplementedError do + @m.dec('key') + end + + @m.set('key', 100) + assert_equal 100, @m.get('key') + assert_raise NotImplementedError do + @m.sub('key', 11) + end + end + end + + sub_test_case 'BasicGaugeMetric' do + setup do + @m = BasicGaugeMetrics.new + @m.use_gauge_metric = true + @m.configure(config_element('metrics', '', {})) + end + + test 'all basic gauge operations work well' do + assert_false @m.has_methods_for_counter + assert_true @m.has_methods_for_gauge + + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + @m.dec('key') + assert_equal 20, @m.get('key') + + @m.set('key', 100) + assert_equal 100, @m.get('key') + @m.sub('key', 11) + assert_equal 89, @m.get('key') + end + end + + sub_test_case 'AliasedGaugeMetric' do + setup do + @m = AliasedGaugeMetrics.new + @m.use_gauge_metric = true + @m.configure(config_element('metrics', '', {})) + end + + test 'all aliased gauge operations work well' do + assert_false @m.has_methods_for_counter + assert_true @m.has_methods_for_gauge + + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + @m.dec('key') + assert_equal 20, @m.get('key') + + @m.set('key', 100) + assert_equal 100, @m.get('key') + @m.sub('key', 11) + assert_equal 89, @m.get('key') + end + end +end diff --git a/test/plugin/test_metrics_local.rb b/test/plugin/test_metrics_local.rb new file mode 100644 index 0000000000..6a8e1bc8d1 --- /dev/null +++ b/test/plugin/test_metrics_local.rb @@ -0,0 +1,96 @@ +require_relative '../helper' +require 'fluent/plugin/metrics_local' +require 'fluent/system_config' + +class LocalMetricsTest < ::Test::Unit::TestCase + sub_test_case 'configure' do + test "configured for counter mode" do + m = Fluent::Plugin::LocalMetrics.new + m.configure(config_element('metrics', '', {"labels" => {test: "test-unit", language: "Ruby"}})) + + assert_false m.use_gauge_metric + assert_equal({agent: "Fluentd", hostname: "#{Socket.gethostname}"}, m.default_labels) + assert_equal({test: "test-unit", language: "Ruby"}, m.labels) + assert_true m.has_methods_for_counter + assert_false m.has_methods_for_gauge + end + + test "configured for gauge mode" do + m = Fluent::Plugin::LocalMetrics.new + m.use_gauge_metric = true + m.configure(config_element('metrics', '', {"labels" => {test: "test-unit", language: "Ruby"}})) + + assert_true m.use_gauge_metric + assert_equal({agent: "Fluentd", hostname: "#{Socket.gethostname}"}, m.default_labels) + assert_equal({test: "test-unit", language: "Ruby"}, m.labels) + assert_false m.has_methods_for_counter + assert_true m.has_methods_for_gauge + end + end + + sub_test_case 'LocalMetric' do + sub_test_case "counter" do + setup do + @m = Fluent::Plugin::LocalMetrics.new + @m.configure(config_element('metrics', '', {})) + end + + test '#configure' do + assert_true @m.has_methods_for_counter + assert_false @m.has_methods_for_gauge + end + + test 'all local counter operations work well' do + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + assert_raise NotImplementedError do + @m.dec('key') + end + + @m.set('key', 100) + assert_equal 100, @m.get('key') + + @m.set('key', 10) + assert_equal 100, @m.get('key') # On counter, value should be overwritten bigger than stored one. + assert_raise NotImplementedError do + @m.sub('key', 11) + end + end + end + + sub_test_case "gauge" do + setup do + @m = Fluent::Plugin::LocalMetrics.new + @m.use_gauge_metric = true + @m.configure(config_element('metrics', '', {})) + end + + test '#configure' do + assert_false @m.has_methods_for_counter + assert_true @m.has_methods_for_gauge + end + + test 'all local gauge operations work well' do + assert_equal 0, @m.get('key') + assert_equal 1, @m.inc('key') + + @m.add('key', 20) + assert_equal 21, @m.get('key') + @m.dec('key') + assert_equal 20, @m.get('key') + + @m.set('key', 100) + assert_equal 100, @m.get('key') + + @m.sub('key', 11) + assert_equal 89, @m.get('key') + + @m.set('key', 10) + assert_equal 10, @m.get('key') # On gauge, value always should be overwritten. + end + end + end +end From 7595a9018d5488123880e1b44fd51b1ab1e78524 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 8 Jul 2021 19:02:50 +0900 Subject: [PATCH 2/3] plugin_helper: metrics: Add actual plugin name prefix for unspecified `@id` plugins Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin_helper/metrics.rb | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/metrics.rb b/lib/fluent/plugin_helper/metrics.rb index 07af4cc490..2b31795126 100644 --- a/lib/fluent/plugin_helper/metrics.rb +++ b/lib/fluent/plugin_helper/metrics.rb @@ -35,6 +35,12 @@ def initialize def configure(conf) super + + @plugin_type_or_id = if self.plugin_id_configured? + self.plugin_id + else + "#{conf["@type"] || conf["type"]}.#{self.plugin_id}" + end end def metrics_create(namespace: "fluentd", subsystem: "metrics", name:, help_text:, labels: {}, prefer_gauge: false) @@ -54,9 +60,10 @@ def metrics_create(namespace: "fluentd", subsystem: "metrics", name:, help_text: if Fluent::Engine.system_config.workers > 1 labels.merge!(worker_id: fluentd_worker_id.to_s) end + labels.merge!(plugin: @plugin_type_or_id) metrics.create(namespace: namespace, subsystem: subsystem, name: name, help_text: help_text, labels: labels) - @_metrics["#{self.plugin_id}_#{namespace}_#{subsystem}_#{name}"] = metrics + @_metrics["#{@plugin_type_or_id}_#{namespace}_#{subsystem}_#{name}"] = metrics metrics end From f1895c362c78189c4973cee0dc7c2e2f6e9dfdd8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 9 Jul 2021 11:01:39 +0900 Subject: [PATCH 3/3] plugin_helper: metrics: Manage plugin lifecycles via owner Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin_helper/metrics.rb | 48 +++++++++++ test/plugin_helper/test_metrics.rb | 128 ++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 test/plugin_helper/test_metrics.rb diff --git a/lib/fluent/plugin_helper/metrics.rb b/lib/fluent/plugin_helper/metrics.rb index 2b31795126..f3fdb8afe4 100644 --- a/lib/fluent/plugin_helper/metrics.rb +++ b/lib/fluent/plugin_helper/metrics.rb @@ -28,8 +28,11 @@ module PluginHelper module Metrics include Fluent::SystemConfig::Mixin + attr_reader :_metrics # For tests. + def initialize super + @_metrics_started = false @_metrics = {} # usage => metrics_state end @@ -68,7 +71,52 @@ def metrics_create(namespace: "fluentd", subsystem: "metrics", name:, help_text: metrics end + def metrics_operate(method_name, &block) + @_metrics.each_pair do |key, m| + begin + block.call(s) if block_given? + m.__send__(method_name) + rescue => e + log.error "unexpected error while #{method_name}", key: key, metrics: m, error: e + end + end + end + + def start + super + + metrics_operate(:start) + @_metrics_started = true + end + + def stop + super + # timer stops automatically in super + metrics_operate(:stop) + end + + def before_shutdown + metrics_operate(:before_shutdown) + super + end + + def shutdown + metrics_operate(:shutdown) + super + end + + def after_shutdown + metrics_operate(:after_shutdown) + super + end + + def close + metrics_operate(:close) + super + end + def terminate + metrics_operate(:terminate) @_metrics = {} super end diff --git a/test/plugin_helper/test_metrics.rb b/test/plugin_helper/test_metrics.rb new file mode 100644 index 0000000000..77ee50c24b --- /dev/null +++ b/test/plugin_helper/test_metrics.rb @@ -0,0 +1,128 @@ +require_relative '../helper' +require 'fluent/plugin_helper/metrics' +require 'fluent/plugin/base' + +class MetricsTest < Test::Unit::TestCase + class Dummy < Fluent::Plugin::TestBase + helpers :metrics + def configure(conf) + super + end + end + + setup do + @d = nil + end + + teardown do + if @d + @d.stop unless @d.stopped? + @d.shutdown unless @d.shutdown? + @d.close unless @d.closed? + @d.terminate unless @d.terminated? + end + end + + test 'can be initialized without any metrics at first' do + d = Dummy.new + assert_equal 0, d._metrics.size + end + + test 'can be configured' do + d1 = Dummy.new + assert_nothing_raised do + d1.configure(config_element()) + end + assert d1.plugin_id + assert d1.log + end + + test 'creates metrics instances' do + d = Dummy.new + d.configure(config_element()) + i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing") + assert{ i.is_a?(Fluent::Plugin::LocalMetrics) } + assert_true i.has_methods_for_counter + assert_false i.has_methods_for_gauge + + d = Dummy.new + d.configure(config_element()) + i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true) + assert{ i.is_a?(Fluent::Plugin::LocalMetrics) } + assert_false i.has_methods_for_counter + assert_true i.has_methods_for_gauge + end + + test 'calls lifecycle methods for all plugin instances via owner plugin' do + @d = d = Dummy.new + d.configure(config_element()) + i1 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing") + i2 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true) + i3 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics3", help_text: "metrics testing") + d.start + + assert i1.started? + assert i2.started? + assert i3.started? + + assert !i1.stopped? + assert !i2.stopped? + assert !i3.stopped? + + d.stop + + assert i1.stopped? + assert i2.stopped? + assert i3.stopped? + + assert !i1.before_shutdown? + assert !i2.before_shutdown? + assert !i3.before_shutdown? + + d.before_shutdown + + assert i1.before_shutdown? + assert i2.before_shutdown? + assert i3.before_shutdown? + + assert !i1.shutdown? + assert !i2.shutdown? + assert !i3.shutdown? + + d.shutdown + + assert i1.shutdown? + assert i2.shutdown? + assert i3.shutdown? + + assert !i1.after_shutdown? + assert !i2.after_shutdown? + assert !i3.after_shutdown? + + d.after_shutdown + + assert i1.after_shutdown? + assert i2.after_shutdown? + assert i3.after_shutdown? + + assert !i1.closed? + assert !i2.closed? + assert !i3.closed? + + d.close + + assert i1.closed? + assert i2.closed? + assert i3.closed? + + assert !i1.terminated? + assert !i2.terminated? + assert !i3.terminated? + + d.terminate + + assert i1.terminated? + assert i2.terminated? + assert i3.terminated? + end +end