Skip to content

Commit

Permalink
in_tail: Shutdown immediately & safely even if reading huge files
Browse files Browse the repository at this point in the history
in_tail takes very long time to shutdown when it's reading huge files.
This patch ensure to shutdown ASAP just after receiving a shutdown
command.

* Constructed tail watchers should be registered ASAP to be recognized
  by the shutdown process.
* Break TailWatcher#on_notify ASAP
  Wait 5 secdonds before break it to ensure to read remaining lines.

It takes effect only when `skip_refresh_on_startup` is enabled, because
reading files is occurred on main thread without it. It should be
enabled by default in the future.

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed May 19, 2021
1 parent b6ba1c2 commit 1a37ef0
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def initialize
@pf_file = nil
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -256,6 +257,7 @@ def stop
end

def shutdown
@shutdown_start_time = Fluent::Clock.now
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(existence_path, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file
Expand Down Expand Up @@ -385,8 +387,6 @@ def setup_watcher(target_info, pe)
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end
Expand All @@ -398,7 +398,7 @@ def setup_watcher(target_info, pe)
event_loop_detach(watcher)
end

tw.detach
tw.detach(@shutdown_start_time)
tw.close
end
raise e
Expand Down Expand Up @@ -427,6 +427,7 @@ def construct_watcher(target_info)
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
Expand All @@ -438,6 +439,7 @@ def construct_watcher(target_info)
def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
break if before_shutdown?
}
end

Expand Down Expand Up @@ -490,9 +492,11 @@ def update_watcher(target_info, pe)

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand All @@ -505,7 +509,7 @@ def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
tw.detach
tw.detach(@shutdown_start_time)

tw.close if close_io

Expand Down Expand Up @@ -717,8 +721,11 @@ def register_watcher(watcher)
@watchers << watcher
end

def detach
@io_handler.on_notify if @io_handler
def detach(shutdown_start_time = nil)
if @io_handler
@io_handler.ready_to_shutdown(shutdown_start_time)
@io_handler.on_notify
end
@line_buffer_timer_flusher&.close(self)
end

Expand Down Expand Up @@ -894,6 +901,7 @@ def bytesize

class IOHandler
BYTES_TO_READ = 8192
TIMEOUT_TO_SHUTDOWN = 5

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
Expand All @@ -910,6 +918,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log = log
@start_reading_time = nil
@number_bytes_read = 0
@shutdown_start_time = nil
@shutdown_mutex = Mutex.new

@log.info "following tail of #{@path}"
end
Expand All @@ -918,6 +928,13 @@ def on_notify
@notify_mutex.synchronize { handle_notify }
end

def ready_to_shutdown(shutdown_start_time = nil)
@shutdown_mutex.synchronize {
@shutdown_start_time =
shutdown_start_time || Fluent::Clock.now
}
end

def close
if @io && !@io.closed?
@io.close
Expand Down Expand Up @@ -948,6 +965,15 @@ def limit_bytes_per_second_reached?
end
end

def should_shutdown_now?
# Ensure to read all remaining lines, but abort immediately if it
# seems to take too long time.
@shutdown_mutex.synchronize {
return false if @shutdown_start_time.nil?
return Fluent::Clock.now - @shutdown_start_time > TIMEOUT_TO_SHUTDOWN
}
end

def handle_notify
return if limit_bytes_per_second_reached?

Expand All @@ -965,7 +991,7 @@ def handle_notify
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if limit_bytes_per_second_reached?
if limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
read_more = false
break
Expand Down

0 comments on commit 1a37ef0

Please sign in to comment.