Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Untracked files should be removed from watching list. fix #1455 #1467

Merged
merged 4 commits into from
Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def start
end

def shutdown
stop_watchers(@tails.keys, true)
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(@tails.keys, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file

super
Expand Down Expand Up @@ -211,7 +212,7 @@ def refresh_watchers
unwatched = existence_paths - target_paths
added = target_paths - existence_paths

stop_watchers(unwatched, false, true) unless unwatched.empty?
stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?
end

Expand Down Expand Up @@ -243,9 +244,9 @@ def start_watchers(paths)
}
end

def stop_watchers(paths, immediate = false, unwatched = false)
def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
tw = @tails[path]
tw = remove_watcher ? @tails.delete(path) : @tails[path]
if tw
tw.unwatched = unwatched
if immediate
Expand Down
40 changes: 40 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,46 @@ def test_log_file_without_extension
plugin = create_driver(config, false).instance
assert_equal expected_files, plugin.expand_paths.sort
end

# For https://github.com/fluent/fluentd/issues/1455
# This test is fragile because test content depends on internal implementaion.
# So if you modify in_tail internal, this test may break.
def test_unwatched_files_should_be_removed
config = config_element("", "", {
"tag" => "tail",
"path" => "#{TMP_DIR}/*.txt",
"format" => "none",
"pos_file" => "#{TMP_DIR}/tail.pos",
"read_from_head" => true,
"refresh_interval" => 1,
})
d = create_driver(config, false)
d.run(expect_emits: 1, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test3\n" }
end

assert_equal 1, d.instance.instance_variable_get(:@tails).keys.size
File.unlink("#{TMP_DIR}/tail.txt")
sleep 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be waiting(5){ sleep 0.1 until the_size_of_tails == 0 } instead of just sleep 2 for stability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good. Will fix.

assert_equal 0, d.instance.instance_variable_get(:@tails).keys.size

base_num = count_timer_object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to confirm that the number of timer watcher does not increase anymore, right?
Please add the comment what you are checking if so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment is needed? Hard to understand from the code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The name of test case doesn't explain what to do here.

2.times {
sleep 1
num = count_timer_object
assert_equal base_num, num
}

d.instance_shutdown
end

def count_timer_object
num = 0
ObjectSpace.each_object(Fluent::PluginHelper::Timer::TimerWatcher) { |obj|
num += 1
}
num
end
end

def test_z_refresh_watchers
Expand Down