diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a07902a715..de7a328993 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -908,6 +908,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @io = nil @notify_mutex = Mutex.new @log = log + @start_reading_time = nil + @number_bytes_read = 0 @log.info "following tail of #{@path}" end @@ -929,37 +931,36 @@ def opened? private - def read_bytes_limits_reached?(number_bytes_read) - number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 - end - - def limit_bytes_per_second_reached?(start_reading, number_bytes_read) - return false unless read_bytes_limits_reached?(number_bytes_read) + def limit_bytes_per_second_reached? + return false if @read_bytes_limit_per_second < 0 # not enabled by conf + return false if @number_bytes_read < @read_bytes_limit_per_second - # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - start_reading + @start_reading_time ||= Fluent::Clock.now + time_spent_reading = Fluent::Clock.now - @start_reading_time @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 - needed_sleeping_time = 1 - time_spent_reading - @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") - return true + true + else + @start_reading_time = nil + @number_bytes_read = 0 + false end - - false end def handle_notify + return if limit_bytes_per_second_reached? + with_io do |io| begin - number_bytes_read = 0 - start_reading = Fluent::Clock.now read_more = false if !io.nil? && @lines.empty? begin while true + @start_reading_time ||= Fluent::Clock.now data = io.readpartial(BYTES_TO_READ, @iobuf) - number_bytes_read += data.bytesize + @number_bytes_read += data.bytesize @fifo << data @fifo.read_lines(@lines) @@ -969,7 +970,7 @@ def handle_notify read_more = true break end - if limit_bytes_per_second_reached?(start_reading, number_bytes_read) + if limit_bytes_per_second_reached? # Just get out from tailing loop. read_more = false break diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index a651144bd3..0c698f215a 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -345,53 +345,114 @@ def test_emit_with_read_lines_limit(data) cleanup_file("#{TMP_DIR}/tail.txt") end - data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], - "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], - "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], - "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], - "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], - "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], - "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], - "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], - "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], - "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], - "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], - "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], - "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], - "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) - def test_emit_with_read_bytes_limit_per_second(data) - config_style, limit, limit_bytes, num_events = data - case config_style - when :flat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) - when :parse - config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG - when :flat_without_stat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) - when :parse_without_stat - config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + sub_test_case "reads_bytes_per_second w/o throttled" do + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2, false], + "flat 8192 bytes, 2 events already read limit reached" => [:flat, 100, 8192, 2, true], + "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2, false], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], + "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + when :flat_without_stat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse_without_stat + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + start_time = Fluent::Clock.now + + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + 100.times do + f.puts msg + end + } + end + + assert_true(Fluent::Clock.now - start_time > 1) + assert_equal(num_events.times.map { {"message" => msg} }, + d.events.collect { |event| event[2] }) + + # Teardown in_tail plugin instance here. + d.instance.shutdown end - d = create_driver(config) - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + end + + sub_test_case "reads_bytes_per_second w/ throttled already" do + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| + require 'fluent/config/types' + limit_bytes_value = Fluent::Config.size_value(limit_bytes) + io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value) + if Fluent.linux? + mock.proxy(io_handler).handle_notify.at_least(5) + else + mock.proxy(io_handler).handle_notify.twice + end + io_handler + end - # We should not do shutdown here due to hard timeout. - d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..30 + File.open("#{TMP_DIR}/tail.txt", "ab") do |f| + 100.times do f.puts msg end - } - end + end - events = d.events - assert_true(events.length <= num_events) - assert_equal({"message" => msg}, events[0][2]) - assert_equal({"message" => msg}, events[1][2]) + # We should not do shutdown here due to hard timeout. + d.run(shutdown: false) do + start_time = Fluent::Clock.now + while Fluent::Clock.now - start_time < 0.8 do + File.open("#{TMP_DIR}/tail.txt", "ab") do |f| + f.puts msg + f.flush + end + sleep 0.05 + end + end - # Teardown in_tail plugin instance here. - d.instance.shutdown + assert_equal([], d.events) + + # Teardown in_tail plugin instance here. + d.instance.shutdown + end end end