diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 968b352d86..fe6e0ba847 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -163,7 +163,8 @@ def start end def shutdown - stop_watchers(@tails.keys, true) + # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. + stop_watchers(@tails.keys, immediate: true, remove_watcher: false) @pf_file.close if @pf_file super @@ -211,7 +212,7 @@ def refresh_watchers unwatched = existence_paths - target_paths added = target_paths - existence_paths - stop_watchers(unwatched, false, true) unless unwatched.empty? + stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty? start_watchers(added) unless added.empty? end @@ -243,9 +244,9 @@ def start_watchers(paths) } end - def stop_watchers(paths, immediate = false, unwatched = false) + def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true) paths.each { |path| - tw = @tails[path] + tw = remove_watcher ? @tails.delete(path) : @tails[path] if tw tw.unwatched = unwatched if immediate diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 98dbf12278..dd6e2b7360 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -12,14 +12,7 @@ class TailInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - FileUtils.rm_rf(TMP_DIR, secure: true) - if File.exist?(TMP_DIR) - # ensure files are closed for Windows, on which deleted files - # are still visible from filesystem - GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) - FileUtils.remove_entry_secure(TMP_DIR, true) - end - FileUtils.mkdir_p(TMP_DIR) + cleanup_directory(TMP_DIR) end def teardown @@ -27,6 +20,17 @@ def teardown Fluent::Engine.stop end + def cleanup_directory(path) + FileUtils.rm_rf(path, secure: true) + if File.exist?(path) + # ensure files are closed for Windows, on which deleted files + # are still visible from filesystem + GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) + FileUtils.remove_entry_secure(path, true) + end + FileUtils.mkdir_p(path) + end + TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" CONFIG = config_element("ROOT", "", { @@ -866,6 +870,48 @@ def test_log_file_without_extension plugin = create_driver(config, false).instance assert_equal expected_files, plugin.expand_paths.sort end + + # For https://github.com/fluent/fluentd/issues/1455 + # This test is fragile because test content depends on internal implementaion. + # So if you modify in_tail internal, this test may break. + def test_unwatched_files_should_be_removed + config = config_element("", "", { + "tag" => "tail", + "path" => "#{TMP_DIR}/*.txt", + "format" => "none", + "pos_file" => "#{TMP_DIR}/tail.pos", + "read_from_head" => true, + "refresh_interval" => 1, + }) + d = create_driver(config, false) + d.run(expect_emits: 1, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test3\n" } + end + + assert_equal 1, d.instance.instance_variable_get(:@tails).keys.size + cleanup_directory(TMP_DIR) + waiting(20) { sleep 0.1 until Dir.glob("#{TMP_DIR}/*.txt").size == 0 } # Ensure file is deleted on Windows + waiting(5) { sleep 0.1 until d.instance.instance_variable_get(:@tails).keys.size == 0 } + + # Previous implementaion has an infinite watcher creation bug. + # Following code checks such unexpected bug by couting actual object allocation. + base_num = count_timer_object + 2.times { + sleep 1 + num = count_timer_object + assert_equal base_num, num + } + + d.instance_shutdown + end + + def count_timer_object + num = 0 + ObjectSpace.each_object(Fluent::PluginHelper::Timer::TimerWatcher) { |obj| + num += 1 + } + num + end end def test_z_refresh_watchers