From 0f018a4a0a617b62bcd288501272438076a7f42f Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 19 May 2021 00:39:36 +0900 Subject: [PATCH] in_tail: Shutdown immediately & safely even if reading huge files in_tail takes very long time to shutdown when it's reading huge files. This patch ensure to shutdown ASAP just after receiving a shutdown command. * Constructed tail watchers should be registered ASAP to be recognized by the shutdown process. * Break TailWatcher#on_notify ASAP Wait 5 secdonds before breaking it to ensure reading remaining lines. It takes effect only when `skip_refresh_on_startup` is enabled, because reading files is occurred on main thread without it. It should be enabled by default in the future. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 43 ++++++++++++++++++++++++++++++------ test/plugin/test_in_tail.rb | 29 ++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3b7142daa6..60e269cf1f 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -56,6 +56,7 @@ def initialize @pf_file = nil @pf = nil @ignore_list = [] + @shutdown_start_time = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -256,6 +257,7 @@ def stop end def shutdown + @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) @pf_file.close if @pf_file @@ -385,8 +387,6 @@ def setup_watcher(target_info, pe) tw.register_watcher(tt) end - tw.on_notify - tw.watchers.each do |watcher| event_loop_attach(watcher) end @@ -398,7 +398,7 @@ def setup_watcher(target_info, pe) event_loop_detach(watcher) end - tw.detach + tw.detach(@shutdown_start_time) tw.close end raise e @@ -427,6 +427,7 @@ def construct_watcher(target_info) begin target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) @tails[target_info] = tw + tw.on_notify rescue Errno::ENOENT, Errno::EACCES => e $log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now." # explicitly detach and unwatch watcher `tw`. @@ -438,6 +439,7 @@ def construct_watcher(target_info) def start_watchers(targets_info) targets_info.each_value {|target_info| construct_watcher(target_info) + break if before_shutdown? } end @@ -490,9 +492,11 @@ def update_watcher(target_info, pe) if new_position_entry.read_inode == 0 @tails[new_target_info] = setup_watcher(new_target_info, new_position_entry) + @tails[new_target_info].on_notify end else @tails[new_target_info] = setup_watcher(new_target_info, pe) + @tails[new_target_info].on_notify end detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw end @@ -505,7 +509,7 @@ def detach_watcher(tw, ino, close_io = true) tw.watchers.each do |watcher| event_loop_detach(watcher) end - tw.detach + tw.detach(@shutdown_start_time) tw.close if close_io @@ -717,8 +721,11 @@ def register_watcher(watcher) @watchers << watcher end - def detach - @io_handler.on_notify if @io_handler + def detach(shutdown_start_time = nil) + if @io_handler + @io_handler.ready_to_shutdown(shutdown_start_time) + @io_handler.on_notify + end @line_buffer_timer_flusher&.close(self) end @@ -894,6 +901,9 @@ def bytesize class IOHandler BYTES_TO_READ = 8192 + SHUTDOWN_TIMEOUT = 5 + + attr_accessor :shutdown_timeout def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher @@ -910,6 +920,9 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log = log @start_reading_time = nil @number_bytes_read = 0 + @shutdown_start_time = nil + @shutdown_timeout = SHUTDOWN_TIMEOUT + @shutdown_mutex = Mutex.new @log.info "following tail of #{@path}" end @@ -918,6 +931,13 @@ def on_notify @notify_mutex.synchronize { handle_notify } end + def ready_to_shutdown(shutdown_start_time = nil) + @shutdown_mutex.synchronize { + @shutdown_start_time = + shutdown_start_time || Fluent::Clock.now + } + end + def close if @io && !@io.closed? @io.close @@ -948,6 +968,15 @@ def limit_bytes_per_second_reached? end end + def should_shutdown_now? + # Ensure to read all remaining lines, but abort immediately if it + # seems to take too long time. + @shutdown_mutex.synchronize { + return false if @shutdown_start_time.nil? + return Fluent::Clock.now - @shutdown_start_time > @shutdown_timeout + } + end + def handle_notify return if limit_bytes_per_second_reached? @@ -965,7 +994,7 @@ def handle_notify @fifo.read_lines(@lines) @log.debug("reading file: #{@path}") - if limit_bytes_per_second_reached? + if limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false break diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 9560aa7117..4705d2f914 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2130,4 +2130,33 @@ def test_EACCES d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("expand_paths: stat() for #{path} failed with Errno::EACCES. Skip file.\n") }) end + + def test_shutdown_timeout + path = "#{TMP_DIR}/tail.txt" + File.open("#{TMP_DIR}/tail.txt", "wb") do |f| + (1024 * 1024 * 5).times do + f.puts "{\"test\":\"fizzbuzz\"}" + end + end + + config = + CONFIG_READ_FROM_HEAD + + config_element('', '', { + 'format' => 'json', + 'skip_refresh_on_startup' => true, + }) + d = create_driver(config) + mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| + io_handler.shutdown_timeout = 0.5 + io_handler + end + + start_time = Fluent::Clock.now + assert_nothing_raised do + d.run(expect_emits: 1) + end + + elapsed = Fluent::Clock.now - start_time + assert_true(elapsed > 0.5 && elapsed < 2.5) + end end