Skip to content

Commit

Permalink
Fix a bug that BufferChunkOverflowError cause a whole data loss
Browse files Browse the repository at this point in the history
In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of raised exception.

For example, incoming partial data exceeds chunk_limit_size,
"message a" and "message b" is lost.

  message a\n
  longer.....message data exceeds chunk_limit_size\n
  message b\n

It is not appropriate because the problem exists only a partial split
data.

In this commit, when BufferChunkOverflowError is detected in
write_step_by_step, collect errors and propagate it to write method
scope.
By this way, a valid partial split is processed and not lost.

Here is the micro benchmark result:

Before:

  Warming up --------------------------------------
  no BufferChunkOverflowError
                          11.389k i/100ms
  raise BufferChunkOverflowError in every 100 calls
                           3.976k i/100ms
  Calculating -------------------------------------
  no BufferChunkOverflowError
                          114.973k (± 2.6%) i/s -      1.150M in  10.012362s
  raise BufferChunkOverflowError in every 100 calls
                           40.086k (± 3.8%) i/s -    401.576k in  10.031710s

  Comparison:
  no BufferChunkOverflowError:   114972.7 i/s
  raise BufferChunkOverflowError in every 100 calls:    40086.1 i/s - 2.87x  (± 0.00) slower

After:

  Warming up --------------------------------------
  no BufferChunkOverflowError
                          11.491k i/100ms
  raise BufferChunkOverflowError in every 100 calls
                           4.690k i/100ms
  Calculating -------------------------------------
  no BufferChunkOverflowError
                          114.295k (± 1.6%) i/s -      1.149M in  10.056319s
  raise BufferChunkOverflowError in every 100 calls
                           48.076k (± 9.0%) i/s -    478.380k in  10.030903s

  Comparison:
  no BufferChunkOverflowError:   114294.8 i/s
  raise BufferChunkOverflowError in every 100 calls:    48076.2 i/s - 2.38x  (± 0.00) slower

It seems that it has a bit performance drawbacks above test
case. Compare Before:
Calculating and After: Calculating section, but almost same.

Each with BufferChunkOverflowError and without
BufferChunkOverflowError micro benchmark, almost same.

Here is the micro benchmark test case:

  test '#write BufferChunkOverflowError micro benchmark' do
    require 'benchmark/ips'
    es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ])
    large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ])
    Benchmark.ips do |x|
      x.config(:time => 10, :warmup => 2)
      x.report("no BufferChunkOverflowError") do
        @p.write({@dm0 => es}, format: @Format)
      end
      x.report("raise BufferChunkOverflowError in every 100 calls") do |n|
        n.times do |i|
          if i % 100 == 0
            assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
              @p.write({@dm0 => large}, format: @Format)
            end
          else
            @p.write({@dm0 => es}, format: @Format)
          end
        end
      end
      x.compare!
    end
  end

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Nov 17, 2021
1 parent f03bb92 commit 46dad90
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 9 deletions.
31 changes: 23 additions & 8 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
buffer_chunk_overflow_errors = []

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
Expand All @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
if error and not error.empty?
buffer_chunk_overflow_errors << error
end
end
end

Expand Down Expand Up @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
unless buffer_chunk_overflow_errors.empty?
# Notify delayed BufferChunkOverflowError here
raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
end
end
end

Expand Down Expand Up @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -761,20 +771,25 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end
end

if format
chunk.concat(format.call(split), split.size)
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
end

if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
Expand All @@ -795,7 +810,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
block.call(chunk, chunk.bytesize - original_bytesize, errors)
end
end
rescue ShouldRetry
Expand Down
48 changes: 47 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand Down Expand Up @@ -944,6 +944,52 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]),
multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

nth = []
es.entries.each_with_index do |entry, index|
if entry.last["message"].size == @p.chunk_limit_size
nth << index
end
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
staged_messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, [{"message" => "a"}, {"message" => "b"}]],
[@p.stage[@dm0].size, staged_messages])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 46dad90

Please sign in to comment.