From 1e400c11dbe5ace38f8d487c13d9c120ba2d795f Mon Sep 17 00:00:00 2001 From: Kentaro Hayashi Date: Wed, 17 Nov 2021 18:59:21 +0900 Subject: [PATCH] buffer: minimize data lost by overflow exception for compressed data In the previous commit, it assumed format is specified and not compressed case. In this commit, it supports compressed data, too. NOTE: In the test case, the boundary value which cause the overflow is calculated in the advance because there is a case that the value varies on machine. See https://github.com/fluent/fluentd/pull/3553#issuecomment-972552443 Signed-off-by: Kentaro Hayashi --- lib/fluent/plugin/buffer.rb | 31 +++++++++++++++++++++++++------ test/plugin/test_buffer.rb | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 1fa1177fb0..39616a6d11 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -354,7 +354,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) unstaged_chunks[metadata] ||= [] unstaged_chunks[metadata] << chunk end - if error and not error.empty? + if error && !error.empty? buffer_chunk_overflow_errors << error end end @@ -773,11 +773,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block) split = splits[writing_splits_index] formatted_split = format ? format.call(split) : split.first if split.size == 1 && original_bytesize == 0 - big_record_size = formatted_split.bytesize - if chunk.bytesize + big_record_size > @chunk_limit_size - errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size" - writing_splits_index += 1 - next + if format == nil && @compress != :text + # The actual size of chunk is not determined until after chunk.append. + # so, keep already processed 'split' content here. + # (allow performance regression a bit) + chunk.commit + else + big_record_size = formatted_split.bytesize + if chunk.bytesize + big_record_size > @chunk_limit_size + errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size" + writing_splits_index += 1 + next + end end end @@ -788,8 +795,19 @@ def write_step_by_step(metadata, data, format, splits_count, &block) end if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over? + adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes' chunk.rollback + if split.size == 1 && original_bytesize == 0 + # It is obviously case that BufferChunkOverflowError should be raised here, + # but if it raises here, already processed 'split' or + # the proceeding 'split' will be lost completely. + # so it is a last resort to delay raising such a exception + errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size" + writing_splits_index += 1 + next + end + if chunk_size_full?(chunk) || split.size == 1 enqueue_chunk_before_retry = true else @@ -811,6 +829,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block) end block.call(chunk, chunk.bytesize - original_bytesize, errors) + errors = [] end end rescue ShouldRetry diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 01670876cb..acf48b7d7f 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -1247,6 +1247,7 @@ def create_chunk_es(metadata, es) sub_test_case 'when compress is gzip' do setup do @p = create_buffer({'compress' => 'gzip'}) + @dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil) end test '#compress returns :gzip' do @@ -1257,6 +1258,30 @@ def create_chunk_es(metadata, es) chunk = @p.generate_chunk(create_metadata) assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable) end + + test '#write compressed data which exceeds chunk_limit_size, it raises BufferChunkOverflowError' do + @p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70}) + timestamp = event_time('2016-04-11 16:00:02 +0000') + es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow + [timestamp, {"message" => "aaa"}], + [timestamp, {"message" => "bbb"}]]) + assert_equal [], @p.queue.map(&:metadata) + assert_equal 70, @p.chunk_limit_size + + # calculate the actual boundary value. it varies on machine + c = @p.generate_chunk(create_metadata) + c.append(Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}]]), compress: :gzip) + overflow_bytes = c.bytesize + + messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size" + assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages) do + # test format == nil && compress == :gzip + @p.write({@dm0 => es}) + end + # message a and b occupies each chunks in full, so both of messages are queued (no staged chunk) + assert_equal([2, [@dm0, @dm0], [1, 1], nil], + [@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]]) + end end sub_test_case '#statistics' do