diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index d00ecd3186..340de6cd88 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -158,7 +158,7 @@ def fetch_compacted_entries(existing_targets = nil) end end - entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes + entries = remove_deleted_files_entries(entries, existing_targets) entries end diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index dba669e1eb..1acbd67b4b 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -22,6 +22,10 @@ class IntailPositionFileTest < Test::Unit::TestCase invalidpath100000000000000000000000000000000 unwatched\t#{UNWATCHED_STR}\t0000000000000000 EOF + TEST_CONTENT_PATHS = { + "valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1), + "inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0), + } def write_data(f, content) f.write(content) @@ -36,7 +40,11 @@ def follow_inodes_block test '.load' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log}) + paths = { + "valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1), + "inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 2), + } + Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log}) @file.seek(0) lines = @file.readlines @@ -118,7 +126,7 @@ def follow_inodes_block test 'should ignore initial existing files on follow_inode' do write_data(@file, TEST_CONTENT) - pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, {}, **{logger: $log}) + pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_PATHS, **{logger: $log}) @file.seek(0) assert_equal([], @file.readlines) @@ -138,7 +146,12 @@ def follow_inodes_block sub_test_case '#load' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log}) + invalid_path = "invalidpath100000000000000000000000000000000" + paths = TEST_CONTENT_PATHS.merge({ + invalid_path => Fluent::Plugin::TailInput::TargetInfo.new(invalid_path, 0), + "unwatched" => Fluent::Plugin::TailInput::TargetInfo.new("unwatched", 0), + }) + Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log}) @file.seek(0) lines = @file.readlines @@ -147,6 +160,15 @@ def follow_inodes_block assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] end + test 'compact deleted paths' do + write_data(@file, TEST_CONTENT) + Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log}) + + @file.seek(0) + lines = @file.readlines + assert_equal [], lines + end + test 'compact data if duplicated line' do write_data(@file, <<~EOF) valid_path\t0000000000000002\t0000000000000001 @@ -163,7 +185,7 @@ def follow_inodes_block sub_test_case '#[]' do test 'return entry' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log}) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log}) valid_target_info = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', File.stat(@file).ino) f = pf[valid_target_info] diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 953ccf0779..4532557059 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1739,7 +1739,7 @@ def test_should_write_latest_offset_after_rotate_wait d.instance_shutdown end - def test_should_keep_and_update_existing_file_pos_entry_for_deleted_file_when_new_file_with_same_name_created + def test_should_remove_deleted_file config = config_element("", "", {"format" => "none"}) path = "#{TMP_DIR}/tail.txt" @@ -1750,30 +1750,11 @@ def test_should_keep_and_update_existing_file_pos_entry_for_deleted_file_when_ne } d = create_driver(config) - d.run(shutdown: false) - - pos_file = File.open("#{TMP_DIR}/tail.pos", "r") - pos_file.pos = 0 - - path_pos_ino = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) - assert_equal(path, path_pos_ino[1]) - assert_equal(pos, path_pos_ino[2].to_i(16)) - assert_equal(ino, path_pos_ino[3].to_i(16)) - - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| - f.puts "test1" - f.puts "test2" - } - Timecop.travel(Time.now + 10) do - sleep 5 + d.run do + pos_file = File.open("#{TMP_DIR}/tail.pos", "r") pos_file.pos = 0 - tuple = create_target_info("#{TMP_DIR}/tail.txt") - path_pos_ino = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) - assert_equal(tuple.path, path_pos_ino[1]) - assert_equal(12, path_pos_ino[2].to_i(16)) - assert_equal(tuple.ino, path_pos_ino[3].to_i(16)) + assert_equal([], pos_file.readlines) end - d.instance_shutdown end def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wait