From 46dad90aac2d463bf3d5b0d1be372ac97c0d0724 Mon Sep 17 00:00:00 2001 From: Kentaro Hayashi Date: Tue, 16 Nov 2021 16:03:40 +0900 Subject: [PATCH] Fix a bug that BufferChunkOverflowError cause a whole data loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the previous version, if BufferChunkOverflowError occurs, whole data is not sent because of raised exception. For example, incoming partial data exceeds chunk_limit_size, "message a" and "message b" is lost. message a\n longer.....message data exceeds chunk_limit_size\n message b\n It is not appropriate because the problem exists only a partial split data. In this commit, when BufferChunkOverflowError is detected in write_step_by_step, collect errors and propagate it to write method scope. By this way, a valid partial split is processed and not lost. Here is the micro benchmark result: Before: Warming up -------------------------------------- no BufferChunkOverflowError 11.389k i/100ms raise BufferChunkOverflowError in every 100 calls 3.976k i/100ms Calculating ------------------------------------- no BufferChunkOverflowError 114.973k (± 2.6%) i/s - 1.150M in 10.012362s raise BufferChunkOverflowError in every 100 calls 40.086k (± 3.8%) i/s - 401.576k in 10.031710s Comparison: no BufferChunkOverflowError: 114972.7 i/s raise BufferChunkOverflowError in every 100 calls: 40086.1 i/s - 2.87x (± 0.00) slower After: Warming up -------------------------------------- no BufferChunkOverflowError 11.491k i/100ms raise BufferChunkOverflowError in every 100 calls 4.690k i/100ms Calculating ------------------------------------- no BufferChunkOverflowError 114.295k (± 1.6%) i/s - 1.149M in 10.056319s raise BufferChunkOverflowError in every 100 calls 48.076k (± 9.0%) i/s - 478.380k in 10.030903s Comparison: no BufferChunkOverflowError: 114294.8 i/s raise BufferChunkOverflowError in every 100 calls: 48076.2 i/s - 2.38x (± 0.00) slower It seems that it has a bit performance drawbacks above test case. Compare Before: Calculating and After: Calculating section, but almost same. Each with BufferChunkOverflowError and without BufferChunkOverflowError micro benchmark, almost same. Here is the micro benchmark test case: test '#write BufferChunkOverflowError micro benchmark' do require 'benchmark/ips' es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ]) large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ]) Benchmark.ips do |x| x.config(:time => 10, :warmup => 2) x.report("no BufferChunkOverflowError") do @p.write({@dm0 => es}, format: @format) end x.report("raise BufferChunkOverflowError in every 100 calls") do |n| n.times do |i| if i % 100 == 0 assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do @p.write({@dm0 => large}, format: @format) end else @p.write({@dm0 => es}, format: @format) end end end x.compare! end end Signed-off-by: Kentaro Hayashi --- lib/fluent/plugin/buffer.rb | 31 +++++++++++++++++------- test/plugin/test_buffer.rb | 48 ++++++++++++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 8177969edc..1fa1177fb0 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) unstaged_chunks = {} # metadata => [chunk, chunk, ...] chunks_to_enqueue = [] staged_bytesizes_by_chunk = {} + # track internal BufferChunkOverflowError in write_step_by_step + buffer_chunk_overflow_errors = [] begin # sort metadata to get lock of chunks in same order with other threads metadata_and_data.keys.sort.each do |metadata| data = metadata_and_data[metadata] - write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize| + write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error| chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads operated_chunks << chunk if chunk.staged? @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) unstaged_chunks[metadata] ||= [] unstaged_chunks[metadata] << chunk end + if error and not error.empty? + buffer_chunk_overflow_errors << error + end end end @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) end chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed end + unless buffer_chunk_overflow_errors.empty? + # Notify delayed BufferChunkOverflowError here + raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ") + end end end @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block) def write_step_by_step(metadata, data, format, splits_count, &block) splits = [] + errors = [] if splits_count > data.size splits_count = data.size end @@ -761,8 +771,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block) begin while writing_splits_index < splits.size split = splits[writing_splits_index] + formatted_split = format ? format.call(split) : split.first + if split.size == 1 && original_bytesize == 0 + big_record_size = formatted_split.bytesize + if chunk.bytesize + big_record_size > @chunk_limit_size + errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size" + writing_splits_index += 1 + next + end + end + if format - chunk.concat(format.call(split), split.size) + chunk.concat(formatted_split, split.size) else chunk.append(split, compress: @compress) end @@ -770,11 +790,6 @@ def write_step_by_step(metadata, data, format, splits_count, &block) if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over? chunk.rollback - if split.size == 1 && original_bytesize == 0 - big_record_size = format ? format.call(split).bytesize : split.first.bytesize - raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size" - end - if chunk_size_full?(chunk) || split.size == 1 enqueue_chunk_before_retry = true else @@ -795,7 +810,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block) raise end - block.call(chunk, chunk.bytesize - original_bytesize) + block.call(chunk, chunk.bytesize - original_bytesize, errors) end end rescue ShouldRetry diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index af9eab54f5..01670876cb 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base end class DummyMemoryChunkError < StandardError; end class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk - attr_reader :append_count, :rollbacked, :closed, :purged + attr_reader :append_count, :rollbacked, :closed, :purged, :chunk attr_accessor :failing def initialize(metadata, compress: :text) super @@ -944,6 +944,52 @@ def create_chunk_es(metadata, es) @p.write({@dm0 => es}, format: @format) end end + + data( + first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]), + intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]), + last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]), + multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}], + [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]) + ) + test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)| + assert_equal [@dm0], @p.stage.keys + assert_equal [], @p.queue.map(&:metadata) + + assert_equal 1_280_000, @p.chunk_limit_size + + nth = [] + es.entries.each_with_index do |entry, index| + if entry.last["message"].size == @p.chunk_limit_size + nth << index + end + end + messages = [] + nth.each do |n| + messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size" + end + + assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do + @p.write({@dm0 => es}, format: @format) + end + # message a and b are concatenated and staged + staged_messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record| + record.last + end + assert_equal([2, [{"message" => "a"}, {"message" => "b"}]], + [@p.stage[@dm0].size, staged_messages]) + # only es0 message is queued + assert_equal [@dm0], @p.queue.map(&:metadata) + assert_equal [5000], @p.queue.map(&:size) + end end sub_test_case 'custom format with configuration for test with lower chunk limit size' do