Skip to content

Commit

Permalink
in_tail: Shutdown immediately & safely even if reading huge files
Browse files Browse the repository at this point in the history
This patch takes effect only when skip_refresh_on_startup is specified.

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed May 19, 2021
1 parent ffe6e58 commit 2e1dd5b
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,6 @@ def setup_watcher(target_info, pe)
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end
Expand Down Expand Up @@ -427,6 +425,7 @@ def construct_watcher(target_info)
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
Expand All @@ -438,6 +437,7 @@ def construct_watcher(target_info)
def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
break if before_shutdown?
}
end

Expand Down Expand Up @@ -490,9 +490,11 @@ def update_watcher(target_info, pe)

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand Down Expand Up @@ -718,6 +720,7 @@ def register_watcher(watcher)
end

def detach
@io_handler.ready_to_shutdown if @io_handler
@io_handler.on_notify if @io_handler
@line_buffer_timer_flusher&.close(self)
end
Expand Down Expand Up @@ -910,6 +913,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log = log
@start_reading_time = nil
@number_bytes_read = 0
@ready_to_shutdown = false
@ready_to_shutdown_mutex = Mutex.new

@log.info "following tail of #{@path}"
end
Expand All @@ -918,6 +923,12 @@ def on_notify
@notify_mutex.synchronize { handle_notify }
end

def ready_to_shutdown
@ready_to_shutdown_mutex.synchronize {
@ready_to_shutdown = true
}
end

def close
if @io && !@io.closed?
@io.close
Expand Down Expand Up @@ -972,7 +983,9 @@ def handle_notify
end
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
@ready_to_shutdown_mutex.synchronize {
read_more = !@ready_to_shutdown
}
break
end
end
Expand Down

0 comments on commit 2e1dd5b

Please sign in to comment.