diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3b7142daa6..f2acff587d 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -385,8 +385,6 @@ def setup_watcher(target_info, pe) tw.register_watcher(tt) end - tw.on_notify - tw.watchers.each do |watcher| event_loop_attach(watcher) end @@ -427,6 +425,7 @@ def construct_watcher(target_info) begin target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) @tails[target_info] = tw + tw.on_notify rescue Errno::ENOENT, Errno::EACCES => e $log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now." # explicitly detach and unwatch watcher `tw`. @@ -438,6 +437,7 @@ def construct_watcher(target_info) def start_watchers(targets_info) targets_info.each_value {|target_info| construct_watcher(target_info) + break if before_shutdown? } end @@ -490,9 +490,11 @@ def update_watcher(target_info, pe) if new_position_entry.read_inode == 0 @tails[new_target_info] = setup_watcher(new_target_info, new_position_entry) + @tails[new_target_info].on_notify end else @tails[new_target_info] = setup_watcher(new_target_info, pe) + @tails[new_target_info].on_notify end detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw end @@ -718,6 +720,7 @@ def register_watcher(watcher) end def detach + @io_handler.ready_to_shutdown if @io_handler @io_handler.on_notify if @io_handler @line_buffer_timer_flusher&.close(self) end @@ -910,6 +913,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log = log @start_reading_time = nil @number_bytes_read = 0 + @ready_to_shutdown = false + @ready_to_shutdown_mutex = Mutex.new @log.info "following tail of #{@path}" end @@ -918,6 +923,12 @@ def on_notify @notify_mutex.synchronize { handle_notify } end + def ready_to_shutdown + @ready_to_shutdown_mutex.synchronize { + @ready_to_shutdown = true + } + end + def close if @io && !@io.closed? @io.close @@ -972,7 +983,9 @@ def handle_notify end if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large - read_more = true + @ready_to_shutdown_mutex.synchronize { + read_more = !@ready_to_shutdown + } break end end