From aad2feb19d125b257d0e962ad6e82e28cda1e77c Mon Sep 17 00:00:00 2001
From: Kentaro Hayashi <hayashi@clear-code.com>
Date: Wed, 24 Nov 2021 13:02:53 +0900
Subject: [PATCH] Don't raise exception when each message size is smaller
 enough

Follow up #3553

In the previous versions, even though each record size is smaller
than chunk limit size, but whole message size exceeds
chunk limit size, BufferChunkOverflowError is raised unexpectedly.

For example, if chunk limit size is 1_280_000, when processing 3 event
stream (every 1_000_000 bytes), it throws an exception like this:

  Fluent::Plugin::Buffer::BufferChunkOverflowError(<a 1000025 bytes
  record (nth: 1) is larger than buffer chunk limit size, a 1000025
  bytes record (nth: 2) is larger than buffer chunk limit size>)

Now changed not to raise exception if it's record size is smaller
enough than chunk limit size. Thus each message is stored into
separated chunks.

The idea is based on that adding byte size is smaller than chunk
limit size, chunk should be unstaged and pushed into queue,
If not, it should be skipped like #3553.

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
---
 lib/fluent/plugin/buffer.rb | 38 ++++++++++++------
 test/plugin/test_buffer.rb  | 77 +++++++++++++++++++++++++++++++++++++
 2 files changed, 104 insertions(+), 11 deletions(-)

diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index cc18f76a1e..7275d70c55 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -767,7 +767,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
             raise ShouldRetry unless chunk.writable?
             staged_chunk_used = true if chunk.staged?
 
-            original_bytesize = chunk.bytesize
+            original_bytesize = committed_bytesize = chunk.bytesize
             begin
               while writing_splits_index < splits.size
                 split = splits[writing_splits_index]
@@ -778,12 +778,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
                     # so, keep already processed 'split' content here.
                     # (allow performance regression a bit)
                     chunk.commit
+                    committed_bytesize = chunk.bytesize
                   else
                     big_record_size = formatted_split.bytesize
-                    if chunk.bytesize + big_record_size > @chunk_limit_size
+                    if big_record_size > @chunk_limit_size
+                      # Just skip to next split (current split is ignored)
                       errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
                       writing_splits_index += 1
                       next
+                    elsif chunk.bytesize + big_record_size > @chunk_limit_size
+                      # No doubt that the split is expected to cause size over, keep 'split' content here.
+                      chunk.commit
+                      committed_bytesize = chunk.bytesize
                     end
                   end
                 end
@@ -793,19 +799,29 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
                 else
                   chunk.append(split, compress: @compress)
                 end
+                adding_bytes = chunk.bytesize - committed_bytesize
 
                 if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
-                  adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
                   chunk.rollback
-
+                  committed_bytesize = chunk.bytesize
                   if split.size == 1 && original_bytesize == 0
-                    # It is obviously case that BufferChunkOverflowError should be raised here,
-                    # but if it raises here, already processed 'split' or
-                    # the proceeding 'split' will be lost completely.
-                    # so it is a last resort to delay raising such a exception
-                    errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
-                    writing_splits_index += 1
-                    next
+                    if adding_bytes < @chunk_limit_size
+                      # As already processed content is kept after rollback, then unstaged chunk should be queued.
+                      # After that, re-process current split again.
+                      # New chunk should be allocated, to do it, modify @stage and so on.
+                      synchronize { @stage.delete(modified_metadata) }
+                      staged_chunk_used = false
+                      chunk.unstaged!
+                      break
+                    else
+                      # It is obviously case that BufferChunkOverflowError should be raised here,
+                      # but if it raises here, already processed 'split' or
+                      # the proceeding 'split' will be lost completely.
+                      # so it is a last resort to delay raising such a exception
+                      errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
+                      writing_splits_index += 1
+                      next
+                    end
                   end
 
                   if chunk_size_full?(chunk) || split.size == 1
diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb
index cfbcd89380..d35d2d6ce7 100644
--- a/test/plugin/test_buffer.rb
+++ b/test/plugin/test_buffer.rb
@@ -990,6 +990,51 @@ def create_chunk_es(metadata, es)
       assert_equal [@dm0], @p.queue.map(&:metadata)
       assert_equal [5000], @p.queue.map(&:size)
     end
+
+    test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
+      assert_equal [@dm0], @p.stage.keys
+      assert_equal [], @p.queue.map(&:metadata)
+      timestamp = event_time('2016-04-11 16:00:02 +0000')
+      es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
+                                         [timestamp, {"message" => "b" * 1_000_000}],
+                                         [timestamp, {"message" => "c" * 1_000_000}]])
+
+      # https://github.com/fluent/fluentd/issues/1849
+      # Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
+      # It should not be raised and message a,b,c should be stored into 3 chunks.
+      assert_nothing_raised do
+        @p.write({@dm0 => es}, format: @format)
+      end
+      messages = []
+      # pick up first letter to check whether chunk is queued in expected order
+      3.times do |index|
+        chunk = @p.queue[index]
+        es = Fluent::MessagePackEventStream.new(chunk.chunk)
+        es.ensure_unpacked!
+        records = es.instance_eval{ @unpacked_records }
+        records.each do |record|
+          messages << record["message"][0]
+        end
+      end
+      es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk)
+      es.ensure_unpacked!
+      staged_message = es.instance_eval{ @unpacked_records }.first["message"]
+      # message a and b are queued, message c is staged
+      assert_equal([
+                     [@dm0],
+                     "c" * 1_000_000,
+                     [@dm0, @dm0, @dm0],
+                     [5000, 1, 1],
+                     [["x"] * 5000, "a", "b"].flatten
+                   ],
+                   [
+                     @p.stage.keys,
+                     staged_message,
+                     @p.queue.map(&:metadata),
+                     @p.queue.map(&:size),
+                     messages
+                   ])
+    end
   end
 
   sub_test_case 'custom format with configuration for test with lower chunk limit size' do
@@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es)
         @p.write({@dm0 => es})
       end
     end
+
+    test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do
+      assert_equal [@dm0], @p.stage.keys
+      assert_equal [], @p.queue.map(&:metadata)
+
+      assert_equal 1_280_000, @p.chunk_limit_size
+
+      es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000]
+      assert_nothing_raised do
+        @p.write({@dm0 => es})
+      end
+      queue_messages = @p.queue.collect do |chunk|
+        # collect first character of each message
+        chunk.chunk[0]
+      end
+      assert_equal([
+                     [@dm0],
+                     1,
+                     "c",
+                     [@dm0, @dm0, @dm0],
+                     [5000, 1, 1],
+                     ["x", "a", "b"]
+                   ],
+                   [
+                     @p.stage.keys,
+                     @p.stage[@dm0].size,
+                     @p.stage[@dm0].chunk[0],
+                     @p.queue.map(&:metadata),
+                     @p.queue.map(&:size),
+                     queue_messages
+                   ])
+    end
   end
 
   sub_test_case 'with configuration for test with lower limits' do