Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Shutdown immediately & safely even if reading huge files #3380

Merged
merged 1 commit into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def initialize
@pf_file = nil
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -256,6 +257,7 @@ def stop
end

def shutdown
@shutdown_start_time = Fluent::Clock.now
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(existence_path, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file
Expand Down Expand Up @@ -385,8 +387,6 @@ def setup_watcher(target_info, pe)
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end
Expand All @@ -398,7 +398,7 @@ def setup_watcher(target_info, pe)
event_loop_detach(watcher)
end

tw.detach
tw.detach(@shutdown_start_time)
tw.close
end
raise e
Expand Down Expand Up @@ -427,6 +427,7 @@ def construct_watcher(target_info)
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
Expand All @@ -438,6 +439,7 @@ def construct_watcher(target_info)
def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
break if before_shutdown?
}
end

Expand Down Expand Up @@ -490,9 +492,11 @@ def update_watcher(target_info, pe)

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructed watchers should be registered ASAP, before running on_notify.
Otherwise shutting down process can't recognize the watcher so that it won't be killed.
This is the why the error I mentioned at #3323 (comment) is occurred.

end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand All @@ -505,7 +509,7 @@ def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
tw.detach
tw.detach(@shutdown_start_time)

tw.close if close_io

Expand Down Expand Up @@ -717,8 +721,11 @@ def register_watcher(watcher)
@watchers << watcher
end

def detach
@io_handler.on_notify if @io_handler
def detach(shutdown_start_time = nil)
if @io_handler
@io_handler.ready_to_shutdown(shutdown_start_time)
@io_handler.on_notify
end
@line_buffer_timer_flusher&.close(self)
end

Expand Down Expand Up @@ -894,6 +901,9 @@ def bytesize

class IOHandler
BYTES_TO_READ = 8192
SHUTDOWN_TIMEOUT = 5

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
Expand All @@ -910,6 +920,9 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log = log
@start_reading_time = nil
@number_bytes_read = 0
@shutdown_start_time = nil
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new

@log.info "following tail of #{@path}"
end
Expand All @@ -918,6 +931,13 @@ def on_notify
@notify_mutex.synchronize { handle_notify }
end

def ready_to_shutdown(shutdown_start_time = nil)
@shutdown_mutex.synchronize {
@shutdown_start_time =
shutdown_start_time || Fluent::Clock.now
}
end

def close
if @io && !@io.closed?
@io.close
Expand Down Expand Up @@ -948,6 +968,15 @@ def limit_bytes_per_second_reached?
end
end

def should_shutdown_now?
# Ensure to read all remaining lines, but abort immediately if it
# seems to take too long time.
@shutdown_mutex.synchronize {
return false if @shutdown_start_time.nil?
return Fluent::Clock.now - @shutdown_start_time > @shutdown_timeout
}
end

def handle_notify
return if limit_bytes_per_second_reached?

Expand All @@ -965,7 +994,7 @@ def handle_notify
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if limit_bytes_per_second_reached?
if limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
read_more = false
break
Expand Down
29 changes: 29 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2130,4 +2130,33 @@ def test_EACCES
d.instance_shutdown
assert($log.out.logs.any?{|log| log.include?("expand_paths: stat() for #{path} failed with Errno::EACCES. Skip file.\n") })
end

def test_shutdown_timeout
path = "#{TMP_DIR}/tail.txt"
File.open("#{TMP_DIR}/tail.txt", "wb") do |f|
(1024 * 1024 * 5).times do
f.puts "{\"test\":\"fizzbuzz\"}"
end
end

config =
CONFIG_READ_FROM_HEAD +
config_element('', '', {
'format' => 'json',
'skip_refresh_on_startup' => true,
})
d = create_driver(config)
mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
io_handler.shutdown_timeout = 0.5
io_handler
end

start_time = Fluent::Clock.now
assert_nothing_raised do
d.run(expect_emits: 1)
end

elapsed = Fluent::Clock.now - start_time
assert_true(elapsed > 0.5 && elapsed < 2.5)
end
end