From 8d37c8585bfd285eab55999f93c55632ffcc64dd Mon Sep 17 00:00:00 2001 From: Steven McDonald Date: Fri, 22 Mar 2019 14:03:25 +0100 Subject: [PATCH] Expose the current list of timekeys as a buffer metric This is useful for monitoring things like: - How old is the current oldest log message still in the buffer? - Equivalently, what is the maximum latency of the logging system as a whole? This might be interesting to track over time. - How new is the *newest* log message in the buffer? This might indicate a delay upstream of fluentd. Signed-off-by: Steven McDonald --- lib/fluent/plugin/buffer.rb | 30 +++++++++++++++++++++++++++ lib/fluent/plugin/in_monitor_agent.rb | 1 + test/plugin/test_buffer.rb | 18 +++++++++++++--- test/plugin/test_in_monitor_agent.rb | 14 +++++++++---- 4 files changed, 56 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index c39c17c109..690290a198 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -154,6 +154,7 @@ def initialize @dequeued_num = {} # metadata => int (number of dequeued chunks) @stage_size = @queue_size = 0 + @timekeys = Hash.new(0) @metadata_list = [] # keys of @stage end @@ -176,12 +177,14 @@ def start @stage.each_pair do |metadata, chunk| @metadata_list << metadata unless @metadata_list.include?(metadata) @stage_size += chunk.bytesize + add_timekey(metadata) end @queue.each do |chunk| @metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata) @queued_num[chunk.metadata] ||= 0 @queued_num[chunk.metadata] += 1 @queue_size += chunk.bytesize + add_timekey(chunk.metadata) end log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size end @@ -206,6 +209,7 @@ def terminate super @dequeued = @stage = @queue = @queued_num = @metadata_list = nil @stage_size = @queue_size = 0 + @timekeys.clear end def storable? @@ -251,6 +255,7 @@ def add_metadata(metadata) @metadata_list[i] else @metadata_list << metadata + add_timekey(metadata) metadata end end @@ -261,6 +266,30 @@ def metadata(timekey: nil, tag: nil, variables: nil) add_metadata(meta) end + def add_timekey(metadata) + if t = metadata.timekey + @timekeys[t] += 1 + end + nil + end + private :add_timekey + + def del_timekey(metadata) + if t = metadata.timekey + if @timekeys[t] <= 1 + @timekeys.delete(t) + else + @timekeys[t] -= 1 + end + end + nil + end + private :del_timekey + + def timekeys + @timekeys.keys + end + # metadata MUST have consistent object_id for each variation # data MUST be Array of serialized events, or EventStream # metadata_and_data MUST be a hash of { metadata => data } @@ -506,6 +535,7 @@ def purge_chunk(chunk_id) @metadata_list.delete(metadata) @queued_num.delete(metadata) @dequeued_num.delete(metadata) + del_timekey(metadata) end log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata end diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index cb919ac05a..a5edc5f5a7 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -285,6 +285,7 @@ def shutdown MONITOR_INFO = { 'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) }, 'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size }, + 'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys }, 'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size }, 'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil }, } diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 3614321ef4..51c241238d 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -189,6 +189,7 @@ def create_chunk_es(metadata, es) assert_equal 0, plugin.stage_size assert_equal 0, plugin.queue_size + assert_equal [], plugin.timekeys # @p is started plugin @@ -242,6 +243,7 @@ def create_chunk_es(metadata, es) assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list assert_equal 0, @p.stage_size assert_equal 0, @p.queue_size + assert_equal [], @p.timekeys end test '#metadata_list returns list of metadata on stage or in queue' do @@ -569,9 +571,12 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3], @p.stage.keys - prev_stage_size = @p.stage_size + timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i + assert !@p.timekeys.include?(timekey) - m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + prev_stage_size = @p.stage_size + + m = @p.metadata(timekey: timekey) @p.write({m => ["x" * 256, "y" * 256, "z" * 256]}) @@ -581,6 +586,8 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3,m], @p.stage.keys + + assert @p.timekeys.include?(timekey) end test '#write tries to enqueue and store data into a new chunk if existing chunk is full' do @@ -688,8 +695,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3], @p.stage.keys + + timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i + assert !@p.timekeys.include?(timekey) - m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + m = @p.metadata(timekey: timekey) es = Fluent::ArrayEventStream.new( [ @@ -708,6 +718,8 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3,m], @p.stage.keys assert_equal 1, @p.stage[m].append_count + + assert @p.timekeys.include?(timekey) end test '#write w/ format tries to enqueue and store data into a new chunk if existing chunk does not have enough space' do diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index 4fba1a73f0..0907d4dd9a 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -123,6 +123,7 @@ def test_configure output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { "buffer_queue_length" => 0, + "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", @@ -296,6 +297,7 @@ def get(uri, header = {}) expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, + "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", @@ -333,6 +335,7 @@ def get(uri, header = {}) expected_test_in_response.merge!("config" => {"@id" => "test_in", "@type" => "test_in"}) if with_config expected_null_response = { "buffer_queue_length" => 0, + "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", @@ -367,6 +370,7 @@ def get(uri, header = {}) } expected_null_response = { "buffer_queue_length" => 0, + "buffer_timekeys" => [], "buffer_total_queued_size" => 0, "output_plugin" => true, "plugin_category" => "output", @@ -434,7 +438,8 @@ def write(chunk) @type test_out_fail_write @id test_out_fail_write - + + timekey 1m flush_mode immediate @@ -453,17 +458,18 @@ def write(chunk) include_config no ") d.instance.start + output = @ra.outputs[0] + output.start + output.after_start expected_test_out_fail_write_response = { "buffer_queue_length" => 1, + "buffer_timekeys" => [output.calculate_timekey(event_time)], "buffer_total_queued_size" => 40, "output_plugin" => true, "plugin_category" => "output", "plugin_id" => "test_out_fail_write", "type" => "test_out_fail_write", } - output = @ra.outputs[0] - output.start - output.after_start output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps 2.times do