Skip to content

Commit

Permalink
Merge pull request #2450 from ganmacs/add-more-metrics
Browse files Browse the repository at this point in the history
Add more metrics to in_monitor_agent
  • Loading branch information
repeatedly authored Jul 1, 2019
2 parents a0240f6 + 05d842e commit 07fee9b
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 16 deletions.
22 changes: 22 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,28 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
end

def statistics
buffer_space = 1.0 - ((@stage_size + @queue_size * 1.0) / @total_limit_size).round
stats = {
'stage_length' => @stage.size,
'stage_byte_size' => @stage_size,
'queue_length' => @queue.size,
'queue_byte_size' => @queue_size,
'available_buffer_space_ratios' => buffer_space * 100,
'total_queued_size' => @stage_size + @queue_size,
}

if (m = timekeys.min)
stats['oldest_timekey'] = m
end

if (m = timekeys.max)
stats['newest_timekey'] = m
end

{ 'buffer' => stats }
end
end
end
end
5 changes: 5 additions & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def start
end
end

# They are deprecated but remain for compatibility
MONITOR_INFO = {
'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
Expand Down Expand Up @@ -331,6 +332,10 @@ def get_monitor_info(pe, opts={})
end
}

if pe.respond_to?(:statistics)
obj.merge!(pe.statistics['output'] || {})
end

obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)

# include all instance variables if :with_debug_info is set
Expand Down
27 changes: 27 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def initialize
@emit_records = 0
@write_count = 0
@rollback_count = 0
@flush_time_count = 0
@slow_flush_count = 0

# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
if implement?(:synchronous)
Expand Down Expand Up @@ -1205,7 +1207,10 @@ def backup_chunk(chunk, using_secondary, delayed_commit)

def check_slow_flush(start)
elapsed_time = Fluent::Clock.now - start
elapsed_millsec = (elapsed_time * 1000).to_i
@counters_monitor.synchronize { @flush_time_count += elapsed_millsec }
if elapsed_time > @slow_flush_log_threshold
@counters_monitor.synchronize { @slow_flush_count += 1 }
log.warn "buffer flush took longer time than slow_flush_log_threshold:",
elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id
end
Expand Down Expand Up @@ -1460,6 +1465,28 @@ def flush_thread_run(state)
state.mutex.unlock
end
end

def statistics
stats = {
'emit_records' => @emit_records,
# Respect original name
# https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284
'retry_count' => @num_errors,
'emit_count' => @emit_count,
'write_count' => @write_count,
'rollback_count' => @rollback_count,
'slow_flush_count' => @slow_flush_count,
'flush_time_count' => @flush_time_count,
}

if @buffer && @buffer.respond_to?(:statistics)
(@buffer.statistics['buffer'] || {}).each do |k, v|
stats["buffer_#{k}"] = v
end
end

{ 'output' => stats }
end
end
end
end
1 change: 1 addition & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def to_masked_element
require 'fluent/msgpack_factory'
require 'fluent/time'
require 'serverengine'
require 'helpers/fuzzy_assert'

module Fluent
module Plugin
Expand Down
89 changes: 89 additions & 0 deletions test/helpers/fuzzy_assert.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
require 'test/unit'

class FuzzyIncludeAssertion
include Test::Unit::Assertions

def self.assert(expected, actual, message = nil)
new(expected, actual, message).assert
end

def initialize(expected, actual, message)
@expected = expected
@actual = actual
@message = message
end

def assert
if collection?
assert_same_collection
else
assert_same_value
end
end

private

def assert_same_value
m = "expected(#{@expected}) !== actual(#{@actual.inspect})"
if @message
m = "#{@message}: #{m}"
end
assert_true(@expected === @actual, m)
end

def assert_same_class
if @expected.class != @actual.class
if (@expected.class.ancestors | @actual.class.ancestors).empty?
assert_equal(@expected.class, @actual.class, @message)
end
end
end

def assert_same_collection
assert_same_class
assert_same_values
end

def assert_same_values
if @expected.is_a?(Array)
@expected.each_with_index do |val, i|
self.class.assert(val, @actual[i], @message)
end
else
@expected.each do |key, val|
self.class.assert(val, @actual[key], "#{key}: ")
end
end
end

def collection?
@actual.is_a?(Array) || @actual.is_a?(Hash)
end
end

class FuzzyAssertion < FuzzyIncludeAssertion
private

def assert_same_collection
super
assert_same_keys
end

def assert_same_keys
if @expected.is_a?(Array)
assert_equal(@expected.size, @actual.size, "expected.size(#{@expected}) != actual.size(#{@expected})")
else
assert_equal(@expected.keys.sort, @actual.keys.sort)
end
end
end

module FuzzyAssert
def assert_fuzzy_include(left, right, message = nil)
FuzzyIncludeAssertion.new(left, right, message).assert
end

def assert_fuzzy_equal(left, right, message = nil)
FuzzyAssertion.new(left, right, message).assert
end
end
105 changes: 89 additions & 16 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
require_relative '../test_plugin_classes'

class MonitorAgentInputTest < Test::Unit::TestCase
include FuzzyAssert

def setup
Fluent::Test.setup
end
Expand Down Expand Up @@ -118,7 +120,13 @@ def test_configure
"plugin_category" => "output",
"plugin_id" => "test_out",
"retry_count" => 0,
"type" => "test_out"
"type" => "test_out",
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
Expand All @@ -129,14 +137,24 @@ def test_configure
"plugin_category" => "output",
"plugin_id" => "null",
"retry_count" => 0,
"type" => "null"
"type" => "null",
"buffer_available_buffer_space_ratios" => Float,
"buffer_queue_byte_size" => Integer,
"buffer_stage_byte_size" => Integer,
"buffer_stage_length" => Integer,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
opts = {with_config: with_config}
assert_equal(input_info, d.instance.get_monitor_info(@ra.inputs.first, opts))
assert_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts))
assert_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts))
assert_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts))
assert_fuzzy_equal(filter_info, d.instance.get_monitor_info(@ra.filters.first, opts))
assert_fuzzy_equal(output_info, d.instance.get_monitor_info(test_label.outputs.first, opts))
assert_fuzzy_equal(error_label_info, d.instance.get_monitor_info(error_label.outputs.first, opts))
end

test "fluentd opts" do
Expand Down Expand Up @@ -186,16 +204,29 @@ def test_configure
"plugin_category" => "output",
"type" => "relabel",
"output_plugin" => true,
"retry_count" => 0}
"retry_count" => 0,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
expect_test_out_record = {
"plugin_id" => "test_out",
"plugin_category" => "output",
"type" => "test_out",
"output_plugin" => true,
"retry_count" => 0
"retry_count" => 0,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
assert_equal(expect_relabel_record, d.events[1][2])
assert_equal(expect_test_out_record, d.events[3][2])
assert_fuzzy_equal(expect_relabel_record, d.events[1][2])
assert_fuzzy_equal(expect_test_out_record, d.events[3][2])
end
end

Expand Down Expand Up @@ -303,15 +334,25 @@ def get(uri, header = {})
"plugin_category" => "output",
"plugin_id" => "null",
"retry_count" => 0,
"type" => "null"
"type" => "null",
"buffer_available_buffer_space_ratios" => Float,
"buffer_queue_byte_size" => Integer,
"buffer_stage_byte_size" => Integer,
"buffer_stage_length" => Integer,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
assert_equal(expected_null_response, null_response)
assert_fuzzy_equal(expected_null_response, null_response)
end

test "/api/plugins.json/not_found" do
Expand Down Expand Up @@ -355,15 +396,25 @@ def get(uri, header = {})
"plugin_category" => "output",
"plugin_id" => "null",
"retry_count" => 0,
"type" => "null"
"type" => "null",
"buffer_available_buffer_space_ratios" => Float,
"buffer_queue_byte_size" => Integer,
"buffer_stage_byte_size" => Integer,
"buffer_stage_length" => Integer,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json#{query_param}").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
assert_equal(expected_null_response, null_response)
assert_fuzzy_include(expected_null_response, null_response)
end

test "/api/plugins.json with 'with_ivars'. response contains specified instance variables of each plugin" do
Expand Down Expand Up @@ -391,13 +442,23 @@ def get(uri, header = {})
"plugin_id" => "null",
"retry_count" => 0,
"type" => "null",
"instance_variables" => {"id" => "null", "num_errors" => 0}
"instance_variables" => {"id" => "null", "num_errors" => 0},
"buffer_available_buffer_space_ratios" => Float,
"buffer_queue_byte_size" => Integer,
"buffer_stage_byte_size" => Integer,
"buffer_stage_length" => Integer,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
}
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
test_in_response = response["plugins"][0]
null_response = response["plugins"][5]
assert_equal(expected_test_in_response, test_in_response)
assert_equal(expected_null_response, null_response)
assert_fuzzy_equal(expected_null_response, null_response)
end

test "/api/config" do
Expand Down Expand Up @@ -510,6 +571,18 @@ def write(chunk)
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
"type" => "test_out_fail_write",
"buffer_newest_timekey" => output.calculate_timekey(event_time),
"buffer_oldest_timekey" => output.calculate_timekey(event_time),
"buffer_available_buffer_space_ratios" => Float,
"buffer_queue_byte_size" => Integer,
"buffer_stage_byte_size" => Integer,
"buffer_stage_length" => Integer,
"emit_count" => Integer,
"emit_records" => Integer,
"write_count" => Integer,
"rollback_count" => Integer,
'slow_flush_count' => Integer,
'flush_time_count' => Integer,
}
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
Expand All @@ -525,7 +598,7 @@ def write(chunk)
# remove dynamic keys
response_retry_count = test_out_fail_write_response.delete("retry_count")
response_retry = test_out_fail_write_response.delete("retry")
assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response)
assert_fuzzy_equal(expected_test_out_fail_write_response, test_out_fail_write_response)
assert{ response_retry.has_key?("steps") }
# it's very hard to check exact retry count (because retries are called by output flush thread scheduling)
assert{ response_retry_count >= 1 && response_retry["steps"] >= 0 }
Expand Down

0 comments on commit 07fee9b

Please sign in to comment.