From 06de13dff527e42508a7ac6fc9bcd73b4f470b91 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 11 Jun 2019 10:40:17 +0900 Subject: [PATCH 1/6] Add stats to output and buffer plugins they are used for monitoring Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 22 +++++++++ lib/fluent/plugin/in_monitor_agent.rb | 5 ++ lib/fluent/plugin/output.rb | 20 ++++++++ test/plugin/test_in_monitor_agent.rb | 69 ++++++++++++++++++++++++--- 4 files changed, 109 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 690290a198..12b5db6e46 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -740,6 +740,28 @@ def write_step_by_step(metadata, data, format, splits_count, &block) enqueue_chunk(metadata) if enqueue_chunk_before_retry retry end + + def statistics + buffer_space = 1.0 - ((@stage_size + @queue_size * 1.0) / @total_limit_size).round + stats = { + 'stage_length' => @stage.size, + 'stage_byte_size' => @stage_size, + 'queue_length' => @queue.size, + 'queue_byte_size' => @queue_size, + 'available_buffer_space_ratios' => buffer_space * 100, + 'total_queued_size' => @stage_size + @queue_size, + } + + if (m = timekeys.min) + stats['oldest_timekey'] = m + end + + if (m = timekeys.max) + stats['newest_timekey'] = m + end + + { 'buffer' => stats } + end end end end diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index fffa0b4d9f..fbdcee5dea 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -230,6 +230,7 @@ def start end end + # They are deprecated but remain for compatibility MONITOR_INFO = { 'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) }, 'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size }, @@ -331,6 +332,10 @@ def get_monitor_info(pe, opts={}) end } + if pe.respond_to?(:statistics) + obj.merge!(pe.statistics['output'] || {}) + end + obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry) # include all instance variables if :with_debug_info is set diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8ea11d66f1..62c275a3ab 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1457,6 +1457,26 @@ def flush_thread_run(state) state.mutex.unlock end end + + def statistics + stats = { + 'emit_records' => @emit_records, + # Respect original name + # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 + 'retry_count' => @num_errors, + 'emit_count' => @emit_count, + 'write_count' => @write_count, + 'rollback_count' => @rollback_count, + } + + if @buffer && @buffer.respond_to?(:statistics) + (@buffer.statistics && @buffer.statistics['buffer'] || {}).each do |k, v| + stats["buffer_#{k}"] = v + end + end + + { 'output' => stats } + end end end end diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index 9a1a9dc76e..b139e243f5 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -118,7 +118,11 @@ def test_configure "plugin_category" => "output", "plugin_id" => "test_out", "retry_count" => 0, - "type" => "test_out" + "type" => "test_out", + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { @@ -129,7 +133,15 @@ def test_configure "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, - "type" => "null" + "type" => "null", + "buffer_available_buffer_space_ratios" => 100.0, + "buffer_queue_byte_size" => 0, + "buffer_stage_byte_size" => 0, + "buffer_stage_length" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} @@ -186,13 +198,22 @@ def test_configure "plugin_category" => "output", "type" => "relabel", "output_plugin" => true, - "retry_count" => 0} + "retry_count" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, + } expect_test_out_record = { "plugin_id" => "test_out", "plugin_category" => "output", "type" => "test_out", "output_plugin" => true, - "retry_count" => 0 + "retry_count" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } assert_equal(expect_relabel_record, d.events[1][2]) assert_equal(expect_test_out_record, d.events[3][2]) @@ -303,7 +324,15 @@ def get(uri, header = {}) "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, - "type" => "null" + "type" => "null", + "buffer_available_buffer_space_ratios" => 100.0, + "buffer_queue_byte_size" => 0, + "buffer_stage_byte_size" => 0, + "buffer_stage_length" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -355,7 +384,15 @@ def get(uri, header = {}) "plugin_category" => "output", "plugin_id" => "null", "retry_count" => 0, - "type" => "null" + "type" => "null", + "buffer_available_buffer_space_ratios" => 100.0, + "buffer_queue_byte_size" => 0, + "buffer_stage_byte_size" => 0, + "buffer_stage_length" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -391,7 +428,15 @@ def get(uri, header = {}) "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "instance_variables" => {"id" => "null", "num_errors" => 0} + "instance_variables" => {"id" => "null", "num_errors" => 0}, + "buffer_available_buffer_space_ratios" => 100.0, + "buffer_queue_byte_size" => 0, + "buffer_stage_byte_size" => 0, + "buffer_stage_length" => 0, + "emit_count" => 0, + "emit_records" => 0, + "write_count" => 0, + "rollback_count" => 0, } response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] @@ -510,6 +555,16 @@ def write(chunk) "plugin_category" => "output", "plugin_id" => "test_out_fail_write", "type" => "test_out_fail_write", + "buffer_newest_timekey" => output.calculate_timekey(event_time), + "buffer_oldest_timekey" => output.calculate_timekey(event_time), + "buffer_available_buffer_space_ratios" => 100.0, + "buffer_queue_byte_size" => 40, + "buffer_stage_byte_size" => 0, + "buffer_stage_length" => 0, + "emit_count" => 1, + "emit_records" => 1, + "write_count" => 2, + "rollback_count" => 0, } output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps From 6736701312a6c7c7c920686afdd0bfb501b8baac Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 11 Jun 2019 18:01:44 +0900 Subject: [PATCH 2/6] Remove unused nil check Signed-off-by: Yuta Iwama --- lib/fluent/plugin/output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 62c275a3ab..3fbf41cbfa 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1470,7 +1470,7 @@ def statistics } if @buffer && @buffer.respond_to?(:statistics) - (@buffer.statistics && @buffer.statistics['buffer'] || {}).each do |k, v| + (@buffer.statistics['buffer'] || {}).each do |k, v| stats["buffer_#{k}"] = v end end From 63c006e31a8dd301e42c3f076e069d010eccca84 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 18 Jun 2019 14:31:12 +0900 Subject: [PATCH 3/6] Add slow_flush_count and flush_time_count Signed-off-by: Yuta Iwama --- lib/fluent/plugin/output.rb | 6 ++++++ test/plugin/test_in_monitor_agent.rb | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 3fbf41cbfa..2b8e16bfa4 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -184,6 +184,8 @@ def initialize @emit_records = 0 @write_count = 0 @rollback_count = 0 + @flush_time_count = 0 + @slow_flush_count = 0 # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) @@ -1202,7 +1204,9 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start + @counters_monitor.synchronize { @flush_time_count += elapsed_time } if elapsed_time > @slow_flush_log_threshold + @counters_monitor.synchronize { @slow_flush_count += 1 } log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end @@ -1467,6 +1471,8 @@ def statistics 'emit_count' => @emit_count, 'write_count' => @write_count, 'rollback_count' => @rollback_count, + 'slow_flush_count' => @slow_flush_count, + 'flush_time_count' => @flush_time_count, } if @buffer && @buffer.respond_to?(:statistics) diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index b139e243f5..b90adb8be1 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -123,6 +123,8 @@ def test_configure "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { @@ -142,6 +144,8 @@ def test_configure "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} @@ -203,6 +207,8 @@ def test_configure "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } expect_test_out_record = { "plugin_id" => "test_out", @@ -214,6 +220,8 @@ def test_configure "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } assert_equal(expect_relabel_record, d.events[1][2]) assert_equal(expect_test_out_record, d.events[3][2]) @@ -333,6 +341,8 @@ def get(uri, header = {}) "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -393,6 +403,8 @@ def get(uri, header = {}) "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -437,6 +449,8 @@ def get(uri, header = {}) "emit_records" => 0, "write_count" => 0, "rollback_count" => 0, + "slow_flush_count" => 0, + "flush_time_count" => 0, } response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] @@ -565,6 +579,8 @@ def write(chunk) "emit_records" => 1, "write_count" => 2, "rollback_count" => 0, + 'slow_flush_count' => 0, + 'flush_time_count' => 0, } output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps From 5edaca8704ea7b6d645f32dd18a3ea167e4ba9bb Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 19 Jun 2019 10:51:07 +0900 Subject: [PATCH 4/6] Call to_i to avoid a overflow Signed-off-by: Yuta Iwama --- lib/fluent/plugin/output.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 2b8e16bfa4..73db0a50f7 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1204,7 +1204,8 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start - @counters_monitor.synchronize { @flush_time_count += elapsed_time } + # millsec precision + @counters_monitor.synchronize { @flush_time_count += (elapsed_time * 1000).to_i } if elapsed_time > @slow_flush_log_threshold @counters_monitor.synchronize { @slow_flush_count += 1 } log.warn "buffer flush took longer time than slow_flush_log_threshold:", From 11105cf27965e63ead928682faf9a8d60b108394 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Fri, 21 Jun 2019 16:59:18 +0900 Subject: [PATCH 5/6] impl fuzzy_assert since we didn't interested in the value of metrics Signed-off-by: Yuta Iwama --- test/helper.rb | 1 + test/helpers/fuzzy_assert.rb | 89 +++++++++++++++ test/plugin/test_in_monitor_agent.rb | 156 ++++++++++++++------------- 3 files changed, 169 insertions(+), 77 deletions(-) create mode 100644 test/helpers/fuzzy_assert.rb diff --git a/test/helper.rb b/test/helper.rb index 592bc423a6..a373ede5db 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -49,6 +49,7 @@ def to_masked_element require 'fluent/msgpack_factory' require 'fluent/time' require 'serverengine' +require 'helpers/fuzzy_assert' module Fluent module Plugin diff --git a/test/helpers/fuzzy_assert.rb b/test/helpers/fuzzy_assert.rb new file mode 100644 index 0000000000..77e3a85154 --- /dev/null +++ b/test/helpers/fuzzy_assert.rb @@ -0,0 +1,89 @@ +require 'test/unit' + +class FuzzyIncludeAssertion + include Test::Unit::Assertions + + def self.assert(expected, actual, message = nil) + new(expected, actual, message).assert + end + + def initialize(expected, actual, message) + @expected = expected + @actual = actual + @message = message + end + + def assert + if collection? + assert_same_collection + else + assert_same_value + end + end + + private + + def assert_same_value + m = "expected(#{@expected}) !== actual(#{@actual.inspect})" + if @message + m = "#{@message}: #{m}" + end + assert_true(@expected === @actual, m) + end + + def assert_same_class + if @expected.class != @actual.class + if (@expected.class.ancestors | @actual.class.ancestors).empty? + assert_equal(@expected.class, @actual.class, @message) + end + end + end + + def assert_same_collection + assert_same_class + assert_same_values + end + + def assert_same_values + if @expected.is_a?(Array) + @expected.each_with_index do |val, i| + self.class.assert(val, @actual[i], @message) + end + else + @expected.each do |key, val| + self.class.assert(val, @actual[key], "#{key}: ") + end + end + end + + def collection? + @actual.is_a?(Array) || @actual.is_a?(Hash) + end +end + +class FuzzyAssertion < FuzzyIncludeAssertion + private + + def assert_same_collection + super + assert_same_keys + end + + def assert_same_keys + if @expected.is_a?(Array) + assert_equal(@expected.size, @actual.size, "expected.size(#{@expected}) != actual.size(#{@expected})") + else + assert_equal(@expected.keys.sort, @actual.keys.sort) + end + end +end + +module FuzzyAssert + def assert_fuzzy_include(left, right, message = nil) + FuzzyIncludeAssertion.new(left, right, message).assert + end + + def assert_fuzzy_equal(left, right, message = nil) + FuzzyAssertion.new(left, right, message).assert + end +end diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index b90adb8be1..fdb20bd328 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -10,6 +10,8 @@ require_relative '../test_plugin_classes' class MonitorAgentInputTest < Test::Unit::TestCase + include FuzzyAssert + def setup Fluent::Test.setup end @@ -119,12 +121,12 @@ def test_configure "plugin_id" => "test_out", "retry_count" => 0, "type" => "test_out", - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { @@ -136,23 +138,23 @@ def test_configure "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "buffer_available_buffer_space_ratios" => 100.0, - "buffer_queue_byte_size" => 0, - "buffer_stage_byte_size" => 0, - "buffer_stage_length" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "buffer_available_buffer_space_ratios" => Float, + "buffer_queue_byte_size" => Integer, + "buffer_stage_byte_size" => Integer, + "buffer_stage_length" => Integer, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} assert_equal(input_info, d.instance.get_monitor_info(@ra.inputs.first, opts)) - assert_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts)) - assert_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts)) - assert_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts)) + assert_fuzzy_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts)) + assert_fuzzy_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts)) + assert_fuzzy_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts)) end test "fluentd opts" do @@ -203,12 +205,12 @@ def test_configure "type" => "relabel", "output_plugin" => true, "retry_count" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } expect_test_out_record = { "plugin_id" => "test_out", @@ -216,15 +218,15 @@ def test_configure "type" => "test_out", "output_plugin" => true, "retry_count" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } - assert_equal(expect_relabel_record, d.events[1][2]) - assert_equal(expect_test_out_record, d.events[3][2]) + assert_fuzzy_equal(expect_relabel_record, d.events[1][2]) + assert_fuzzy_equal(expect_test_out_record, d.events[3][2]) end end @@ -333,16 +335,16 @@ def get(uri, header = {}) "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "buffer_available_buffer_space_ratios" => 100.0, - "buffer_queue_byte_size" => 0, - "buffer_stage_byte_size" => 0, - "buffer_stage_length" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "buffer_available_buffer_space_ratios" => Float, + "buffer_queue_byte_size" => Integer, + "buffer_stage_byte_size" => Integer, + "buffer_stage_length" => Integer, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -350,7 +352,7 @@ def get(uri, header = {}) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) - assert_equal(expected_null_response, null_response) + assert_fuzzy_equal(expected_null_response, null_response) end test "/api/plugins.json/not_found" do @@ -395,16 +397,16 @@ def get(uri, header = {}) "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "buffer_available_buffer_space_ratios" => 100.0, - "buffer_queue_byte_size" => 0, - "buffer_stage_byte_size" => 0, - "buffer_stage_length" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "buffer_available_buffer_space_ratios" => Float, + "buffer_queue_byte_size" => Integer, + "buffer_stage_byte_size" => Integer, + "buffer_stage_length" => Integer, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -412,7 +414,7 @@ def get(uri, header = {}) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) - assert_equal(expected_null_response, null_response) + assert_fuzzy_include(expected_null_response, null_response) end test "/api/plugins.json with 'with_ivars'. response contains specified instance variables of each plugin" do @@ -441,22 +443,22 @@ def get(uri, header = {}) "retry_count" => 0, "type" => "null", "instance_variables" => {"id" => "null", "num_errors" => 0}, - "buffer_available_buffer_space_ratios" => 100.0, - "buffer_queue_byte_size" => 0, - "buffer_stage_byte_size" => 0, - "buffer_stage_length" => 0, - "emit_count" => 0, - "emit_records" => 0, - "write_count" => 0, - "rollback_count" => 0, - "slow_flush_count" => 0, - "flush_time_count" => 0, + "buffer_available_buffer_space_ratios" => Float, + "buffer_queue_byte_size" => Integer, + "buffer_stage_byte_size" => Integer, + "buffer_stage_length" => Integer, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + "slow_flush_count" => Integer, + "flush_time_count" => Integer, } response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) - assert_equal(expected_null_response, null_response) + assert_fuzzy_equal(expected_null_response, null_response) end test "/api/config" do @@ -571,16 +573,16 @@ def write(chunk) "type" => "test_out_fail_write", "buffer_newest_timekey" => output.calculate_timekey(event_time), "buffer_oldest_timekey" => output.calculate_timekey(event_time), - "buffer_available_buffer_space_ratios" => 100.0, - "buffer_queue_byte_size" => 40, - "buffer_stage_byte_size" => 0, - "buffer_stage_length" => 0, - "emit_count" => 1, - "emit_records" => 1, - "write_count" => 2, - "rollback_count" => 0, - 'slow_flush_count' => 0, - 'flush_time_count' => 0, + "buffer_available_buffer_space_ratios" => Float, + "buffer_queue_byte_size" => Integer, + "buffer_stage_byte_size" => Integer, + "buffer_stage_length" => Integer, + "emit_count" => Integer, + "emit_records" => Integer, + "write_count" => Integer, + "rollback_count" => Integer, + 'slow_flush_count' => Integer, + 'flush_time_count' => Integer, } output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps @@ -596,7 +598,7 @@ def write(chunk) # remove dynamic keys response_retry_count = test_out_fail_write_response.delete("retry_count") response_retry = test_out_fail_write_response.delete("retry") - assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response) + assert_fuzzy_equal(expected_test_out_fail_write_response, test_out_fail_write_response) assert{ response_retry.has_key?("steps") } # it's very hard to check exact retry count (because retries are called by output flush thread scheduling) assert{ response_retry_count >= 1 && response_retry["steps"] >= 0 } From 05d842e944f43d9f7610db4d7ed62ea3399273ff Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 25 Jun 2019 13:22:22 +0800 Subject: [PATCH 6/6] Calculate eplapsed time outside of lock Signed-off-by: Yuta Iwama --- lib/fluent/plugin/output.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 73db0a50f7..4b9a4288ed 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1204,8 +1204,8 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start - # millsec precision - @counters_monitor.synchronize { @flush_time_count += (elapsed_time * 1000).to_i } + elapsed_millsec = (elapsed_time * 1000).to_i + @counters_monitor.synchronize { @flush_time_count += elapsed_millsec } if elapsed_time > @slow_flush_log_threshold @counters_monitor.synchronize { @slow_flush_count += 1 } log.warn "buffer flush took longer time than slow_flush_log_threshold:",