From 27bb8057f4bfbfa1b2f93ff812f40db3d75fddc4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 15:22:29 +0900 Subject: [PATCH 1/7] Handle both of stat and timer watchers are enabled and steady growing files case In this case, timer watcher tells file contents every 1 seconds and file contents changes is also notified from stats watcher. In such circumstances, if some tailing files are steadily growing, previous implementation does not prevent log ingestion for written contents which is notified by stat watcher. This commit also prevents log ingestion in such cases. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a07902a715..8a9a9d8a51 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -908,6 +908,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @io = nil @notify_mutex = Mutex.new @log = log + @last_read_time = 0 + @number_bytes_read = 0 @log.info "following tail of #{@path}" end @@ -948,10 +950,17 @@ def limit_bytes_per_second_reached?(start_reading, number_bytes_read) false end + def refresh_bytes_read_counter + if Fluent::Clock.now - @last_read_time > 1 + @number_bytes_read = 0 + end + + yield + end + def handle_notify with_io do |io| begin - number_bytes_read = 0 start_reading = Fluent::Clock.now read_more = false @@ -959,7 +968,9 @@ def handle_notify begin while true data = io.readpartial(BYTES_TO_READ, @iobuf) - number_bytes_read += data.bytesize + refresh_bytes_read_counter do + @number_bytes_read += data.bytesize + end @fifo << data @fifo.read_lines(@lines) @@ -969,11 +980,12 @@ def handle_notify read_more = true break end - if limit_bytes_per_second_reached?(start_reading, number_bytes_read) + if limit_bytes_per_second_reached?(start_reading, @number_bytes_read) # Just get out from tailing loop. read_more = false break end + @last_read_time = Fluent::Clock.now end rescue EOFError end From 34fcda1d69f9692473a8ad0ac9b2c8cbffc96357 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 15:55:31 +0900 Subject: [PATCH 2/7] Handle bytes limits more elastically Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 8a9a9d8a51..d7b003c0ad 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -908,6 +908,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @io = nil @notify_mutex = Mutex.new @log = log + @start_reading = 0 @last_read_time = 0 @number_bytes_read = 0 @@ -931,19 +932,19 @@ def opened? private - def read_bytes_limits_reached?(number_bytes_read) - number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 + def read_bytes_limits_reached? + @number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 end - def limit_bytes_per_second_reached?(start_reading, number_bytes_read) - return false unless read_bytes_limits_reached?(number_bytes_read) + def limit_bytes_per_second_reached? + return false unless read_bytes_limits_reached? # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - start_reading + time_spent_reading = Fluent::Clock.now - @start_reading @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 - needed_sleeping_time = 1 - time_spent_reading - @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") + estimated_waiting_ingestion_time = 1 - time_spent_reading + @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again after #{estimated_waiting_ingestion_time} second(s) or later.") return true end @@ -953,6 +954,7 @@ def limit_bytes_per_second_reached?(start_reading, number_bytes_read) def refresh_bytes_read_counter if Fluent::Clock.now - @last_read_time > 1 @number_bytes_read = 0 + @start_reading = Fluent::Clock.now end yield @@ -960,8 +962,10 @@ def refresh_bytes_read_counter def handle_notify with_io do |io| + return if limit_bytes_per_second_reached? + begin - start_reading = Fluent::Clock.now + @start_reading = Fluent::Clock.now read_more = false if !io.nil? && @lines.empty? @@ -980,7 +984,7 @@ def handle_notify read_more = true break end - if limit_bytes_per_second_reached?(start_reading, @number_bytes_read) + if limit_bytes_per_second_reached? # Just get out from tailing loop. read_more = false break From 4f921a3edb76697d44adafc6567f6d2df0b9b68e Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 11 May 2021 17:00:51 +0900 Subject: [PATCH 3/7] in_tail: Simplify checking read_bytes_limit_per_second Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 43 ++++++++++++------------------------ 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index d7b003c0ad..de7a328993 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -908,8 +908,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @io = nil @notify_mutex = Mutex.new @log = log - @start_reading = 0 - @last_read_time = 0 + @start_reading_time = nil @number_bytes_read = 0 @log.info "following tail of #{@path}" @@ -932,49 +931,36 @@ def opened? private - def read_bytes_limits_reached? - @number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 - end - def limit_bytes_per_second_reached? - return false unless read_bytes_limits_reached? + return false if @read_bytes_limit_per_second < 0 # not enabled by conf + return false if @number_bytes_read < @read_bytes_limit_per_second - # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - @start_reading + @start_reading_time ||= Fluent::Clock.now + time_spent_reading = Fluent::Clock.now - @start_reading_time @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") - if time_spent_reading < 1 - estimated_waiting_ingestion_time = 1 - time_spent_reading - @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again after #{estimated_waiting_ingestion_time} second(s) or later.") - return true - end - - false - end - def refresh_bytes_read_counter - if Fluent::Clock.now - @last_read_time > 1 + if time_spent_reading < 1 + true + else + @start_reading_time = nil @number_bytes_read = 0 - @start_reading = Fluent::Clock.now + false end - - yield end def handle_notify - with_io do |io| - return if limit_bytes_per_second_reached? + return if limit_bytes_per_second_reached? + with_io do |io| begin - @start_reading = Fluent::Clock.now read_more = false if !io.nil? && @lines.empty? begin while true + @start_reading_time ||= Fluent::Clock.now data = io.readpartial(BYTES_TO_READ, @iobuf) - refresh_bytes_read_counter do - @number_bytes_read += data.bytesize - end + @number_bytes_read += data.bytesize @fifo << data @fifo.read_lines(@lines) @@ -989,7 +975,6 @@ def handle_notify read_more = false break end - @last_read_time = Fluent::Clock.now end rescue EOFError end From bc11e6c477744185974e2a38ebbdfc7ef2fff0f1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 13 May 2021 12:49:58 +0900 Subject: [PATCH 4/7] in_tail: test: Add testcases for already reached bytes limits Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 137 ++++++++++++++++++++++++------------ 1 file changed, 93 insertions(+), 44 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index a651144bd3..c995e65501 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -345,53 +345,102 @@ def test_emit_with_read_lines_limit(data) cleanup_file("#{TMP_DIR}/tail.txt") end - data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], - "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], - "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], - "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], - "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], - "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], - "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], - "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], - "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], - "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], - "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], - "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], - "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], - "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) - def test_emit_with_read_bytes_limit_per_second(data) - config_style, limit, limit_bytes, num_events = data - case config_style - when :flat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) - when :parse - config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG - when :flat_without_stat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) - when :parse_without_stat - config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG - end - d = create_driver(config) - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. - - # We should not do shutdown here due to hard timeout. - d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..30 - f.puts msg - end - } + sub_test_case "reads_bytes_per_second w/o throttled" do + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2, false], + "flat 8192 bytes, 2 events already read limit reached" => [:flat, 100, 8192, 2, true], + "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2, false], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], + "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + when :flat_without_stat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse_without_stat + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for x in 0..30 + f.puts msg + end + } + end + + events = d.events + assert_true(events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + + # Teardown in_tail plugin instance here. + d.instance.shutdown end + end - events = d.events - assert_true(events.length <= num_events) - assert_equal({"message" => msg}, events[0][2]) - assert_equal({"message" => msg}, events[1][2]) + sub_test_case "reads_bytes_per_second w/ throttled already" do + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| + require 'fluent/config/types' + limit_bytes_value = Fluent::Config.size_value(limit_bytes) + io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value) + io_handler + end - # Teardown in_tail plugin instance here. - d.instance.shutdown + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for _x in 0..30 + f.puts msg + end + } + end + + events = d.events + assert_true(events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + + # Teardown in_tail plugin instance here. + d.instance.shutdown + end end end From bee91259b98398e6ae00c00b52a71bb08851e4db Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 13 May 2021 16:19:02 +0900 Subject: [PATCH 5/7] in_tail: test: Add writing files before checking bytes limits case Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 59 +++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index c995e65501..d8ca1e4745 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -442,6 +442,65 @@ def test_emit_with_read_bytes_limit_per_second(data) d.instance.shutdown end end + + sub_test_case "reads_bytes_per_second with several writing points" do + class Fluent::Plugin::TailInput::TailWatcher::IOHandler + alias_method :orig_limit_bytes_per_second_reached?, :limit_bytes_per_second_reached? + def limit_bytes_per_second_reached?(&block) + yield if block_given? + + orig_limit_bytes_per_second_reached? + end + end + + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| + io_handler.limit_bytes_per_second_reached? do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for _x in 0..5 + f.puts msg + end + } + end + io_handler + end + + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for _x in 0..30 + f.puts msg + end + } + end + + events = d.events + assert_true(events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + + # Teardown in_tail plugin instance here. + d.instance.shutdown + end + end end data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, From 0d0e116074389659d083e05d0ceb420384894903 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 13 May 2021 17:59:58 +0900 Subject: [PATCH 6/7] test_in_tail: Fix some inappropriate tests for throttling feature * Check elapsed time * Check exact number of emitted events * Check inotify is surely emitted * Remove a needless test. Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 87 ++++++++----------------------------- 1 file changed, 18 insertions(+), 69 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d8ca1e4745..9003a98cd7 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -378,19 +378,20 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + start_time = Fluent::Clock.now + # We should not do shutdown here due to hard timeout. d.run(expect_emits: 2, shutdown: false) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for x in 0..30 + 100.times do f.puts msg end } end - events = d.events - assert_true(events.length <= num_events) - assert_equal({"message" => msg}, events[0][2]) - assert_equal({"message" => msg}, events[1][2]) + assert_true(Fluent::Clock.now - start_time > 1) + assert_equal(num_events.times.map { {"message" => msg} }, + d.events.collect { |event| event[2] }) # Teardown in_tail plugin instance here. d.instance.shutdown @@ -421,81 +422,29 @@ def test_emit_with_read_bytes_limit_per_second(data) require 'fluent/config/types' limit_bytes_value = Fluent::Config.size_value(limit_bytes) io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value) + mock.proxy(io_handler).handle_notify.at_least(5) io_handler end - # We should not do shutdown here due to hard timeout. - d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..30 - f.puts msg - end - } - end - - events = d.events - assert_true(events.length <= num_events) - assert_equal({"message" => msg}, events[0][2]) - assert_equal({"message" => msg}, events[1][2]) - - # Teardown in_tail plugin instance here. - d.instance.shutdown - end - end - - sub_test_case "reads_bytes_per_second with several writing points" do - class Fluent::Plugin::TailInput::TailWatcher::IOHandler - alias_method :orig_limit_bytes_per_second_reached?, :limit_bytes_per_second_reached? - def limit_bytes_per_second_reached?(&block) - yield if block_given? - - orig_limit_bytes_per_second_reached? - end - end - - data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], - "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], - "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], - "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], - "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], - "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) - def test_emit_with_read_bytes_limit_per_second(data) - config_style, limit, limit_bytes, num_events = data - case config_style - when :flat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) - when :parse - config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG - end - d = create_driver(config) - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. - - mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| - io_handler.limit_bytes_per_second_reached? do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..5 - f.puts msg - end - } + File.open("#{TMP_DIR}/tail.txt", "ab") do |f| + 100.times do + f.puts msg end - io_handler end # We should not do shutdown here due to hard timeout. - d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..30 + d.run(shutdown: false) do + start_time = Fluent::Clock.now + while Fluent::Clock.now - start_time < 0.8 do + File.open("#{TMP_DIR}/tail.txt", "ab") do |f| f.puts msg + f.flush end - } + sleep 0.05 + end end - events = d.events - assert_true(events.length <= num_events) - assert_equal({"message" => msg}, events[0][2]) - assert_equal({"message" => msg}, events[1][2]) + assert_equal([], d.events) # Teardown in_tail plugin instance here. d.instance.shutdown From d4d01180301624913fcf9fe1a3a564854026f47f Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 13 May 2021 20:36:14 +0900 Subject: [PATCH 7/7] test_in_tail: Fix a failed test on windows & mac Because they doesn't support inotify. Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 9003a98cd7..0c698f215a 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -422,7 +422,11 @@ def test_emit_with_read_bytes_limit_per_second(data) require 'fluent/config/types' limit_bytes_value = Fluent::Config.size_value(limit_bytes) io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value) - mock.proxy(io_handler).handle_notify.at_least(5) + if Fluent.linux? + mock.proxy(io_handler).handle_notify.at_least(5) + else + mock.proxy(io_handler).handle_notify.twice + end io_handler end