From 1a37ef0706e1d9351f49110e6308e02e43e29b7a Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 19 May 2021 00:39:36 +0900 Subject: [PATCH] in_tail: Shutdown immediately & safely even if reading huge files in_tail takes very long time to shutdown when it's reading huge files. This patch ensure to shutdown ASAP just after receiving a shutdown command. * Constructed tail watchers should be registered ASAP to be recognized by the shutdown process. * Break TailWatcher#on_notify ASAP Wait 5 secdonds before break it to ensure to read remaining lines. It takes effect only when `skip_refresh_on_startup` is enabled, because reading files is occurred on main thread without it. It should be enabled by default in the future. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 40 +++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3b7142daa6..614615f2fb 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -56,6 +56,7 @@ def initialize @pf_file = nil @pf = nil @ignore_list = [] + @shutdown_start_time = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -256,6 +257,7 @@ def stop end def shutdown + @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) @pf_file.close if @pf_file @@ -385,8 +387,6 @@ def setup_watcher(target_info, pe) tw.register_watcher(tt) end - tw.on_notify - tw.watchers.each do |watcher| event_loop_attach(watcher) end @@ -398,7 +398,7 @@ def setup_watcher(target_info, pe) event_loop_detach(watcher) end - tw.detach + tw.detach(@shutdown_start_time) tw.close end raise e @@ -427,6 +427,7 @@ def construct_watcher(target_info) begin target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) @tails[target_info] = tw + tw.on_notify rescue Errno::ENOENT, Errno::EACCES => e $log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now." # explicitly detach and unwatch watcher `tw`. @@ -438,6 +439,7 @@ def construct_watcher(target_info) def start_watchers(targets_info) targets_info.each_value {|target_info| construct_watcher(target_info) + break if before_shutdown? } end @@ -490,9 +492,11 @@ def update_watcher(target_info, pe) if new_position_entry.read_inode == 0 @tails[new_target_info] = setup_watcher(new_target_info, new_position_entry) + @tails[new_target_info].on_notify end else @tails[new_target_info] = setup_watcher(new_target_info, pe) + @tails[new_target_info].on_notify end detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw end @@ -505,7 +509,7 @@ def detach_watcher(tw, ino, close_io = true) tw.watchers.each do |watcher| event_loop_detach(watcher) end - tw.detach + tw.detach(@shutdown_start_time) tw.close if close_io @@ -717,8 +721,11 @@ def register_watcher(watcher) @watchers << watcher end - def detach - @io_handler.on_notify if @io_handler + def detach(shutdown_start_time = nil) + if @io_handler + @io_handler.ready_to_shutdown(shutdown_start_time) + @io_handler.on_notify + end @line_buffer_timer_flusher&.close(self) end @@ -894,6 +901,7 @@ def bytesize class IOHandler BYTES_TO_READ = 8192 + TIMEOUT_TO_SHUTDOWN = 5 def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher @@ -910,6 +918,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log = log @start_reading_time = nil @number_bytes_read = 0 + @shutdown_start_time = nil + @shutdown_mutex = Mutex.new @log.info "following tail of #{@path}" end @@ -918,6 +928,13 @@ def on_notify @notify_mutex.synchronize { handle_notify } end + def ready_to_shutdown(shutdown_start_time = nil) + @shutdown_mutex.synchronize { + @shutdown_start_time = + shutdown_start_time || Fluent::Clock.now + } + end + def close if @io && !@io.closed? @io.close @@ -948,6 +965,15 @@ def limit_bytes_per_second_reached? end end + def should_shutdown_now? + # Ensure to read all remaining lines, but abort immediately if it + # seems to take too long time. + @shutdown_mutex.synchronize { + return false if @shutdown_start_time.nil? + return Fluent::Clock.now - @shutdown_start_time > TIMEOUT_TO_SHUTDOWN + } + end + def handle_notify return if limit_bytes_per_second_reached? @@ -965,7 +991,7 @@ def handle_notify @fifo.read_lines(@lines) @log.debug("reading file: #{@path}") - if limit_bytes_per_second_reached? + if limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false break