From 04f7821d81212b2ad787e627f23634e632f4d4f6 Mon Sep 17 00:00:00 2001
From: Takuro Ashie <ashie@clear-code.com>
Date: Tue, 12 Dec 2023 13:17:56 +0900
Subject: [PATCH] Add a test for issue #3089

https://github.com/fluent/fluentd/issues/3089

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
---
 test/plugin/test_buffer.rb | 49 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb
index d35d2d6ce7..8fd67cf974 100644
--- a/test/plugin/test_buffer.rb
+++ b/test/plugin/test_buffer.rb
@@ -850,6 +850,55 @@ def create_chunk_es(metadata, es)
     test '#compress returns :text' do
       assert_equal :text, @p.compress
     end
+
+    # https://github.com/fluent/fluentd/issues/3089
+    test "closed chunk should not be committed" do
+      assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
+      assert_equal 0.95, @p.chunk_full_threshold
+
+      purge_count = 0
+
+      stub.proxy(@p).generate_chunk(anything) do |chunk|
+        stub.proxy(chunk).purge do |result|
+          purge_count += 1
+          result
+        end
+        stub.proxy(chunk).commit do |result|
+          assert_false(chunk.closed?)
+          result
+        end
+        stub.proxy(chunk).rollback do |result|
+          assert_false(chunk.closed?)
+          result
+        end
+        chunk
+      end
+
+      m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
+      small_row = "x" * 1024 * 400
+      row = "x" * 1024 * 1024 * 8 # just chunk_size_limit
+      @p.write({m => [small_row] * 41 + [row]}) # 42 events in 1 event stream
+
+      # Above event strem will be splitted twice by `Buffer#write_step_by_step`
+      #
+      # 1. `write_once`: 42 [events] * 1 [stream]
+      # 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
+      # 3. `write_step_by_step` (by ShouldRetry): 1 [event] * 42 [streams]
+      #
+      # The problematic data is built in the 2nd stage.
+      # In the 2nd stage, 5 streams are packed in a chunk.
+      # ((1024 * 400) [bytes] * 4 [events] * 5 [streams] = 8192000 [bytes] < `chunk_limit_size` (8MB)).
+      # So 3 chunks are used to store all data.
+      # The 1st chunk is already staged by `write_once`.
+      # The 2nd & 3rd chunks are newly created as unstaged.
+      # The 2nd chunk is purged before `ShouldRetry`, it's no problem:
+      #   https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L850
+      # The 3rd chunk is purged in `rescue ShouldRetry`:
+      #   https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L862
+      # The last one causes the issue describe in https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198
+
+      assert_equal 2, purge_count
+    end
   end
 
   sub_test_case 'standard format with configuration for test with lower chunk limit size' do