diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index faf27e95ee..d00ecd3186 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -22,19 +22,18 @@ class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze - def self.load(file, follow_inodes, existing_paths, logger:) - pf = new(file, follow_inodes, existing_paths, logger: logger) - pf.load + def self.load(file, follow_inodes, existing_targets, logger:) + pf = new(file, follow_inodes, logger: logger) + pf.load(existing_targets) pf end - def initialize(file, follow_inodes, existing_paths, logger: nil) + def initialize(file, follow_inodes, logger: nil) @file = file @logger = logger @file_mutex = Mutex.new @map = {} @follow_inodes = follow_inodes - @existing_paths = existing_paths end def [](target_info) @@ -60,8 +59,8 @@ def unwatch(target_info) end end - def load - compact + def load(existing_targets = nil) + compact(existing_targets) map = {} @file_mutex.synchronize do @@ -118,9 +117,9 @@ def try_compact private - def compact + def compact(existing_targets = nil) @file_mutex.synchronize do - entries = fetch_compacted_entries.values.map(&:to_entry_fmt) + entries = fetch_compacted_entries(existing_targets).values.map(&:to_entry_fmt) @file.pos = 0 @file.truncate(0) @@ -128,7 +127,7 @@ def compact end end - def fetch_compacted_entries + def fetch_compacted_entries(existing_targets = nil) entries = {} @file.pos = 0 @@ -151,23 +150,26 @@ def fetch_compacted_entries end if @follow_inodes - entries[ino] = Entry.new(path, pos, ino, file_pos + path.size + 1) + entries[ino] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1) else - entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1) + entries[path] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1) end file_pos += line.size end end - entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes + entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes entries end - def remove_deleted_files_entries(existent_entries, existing_paths) - filtered_entries = existent_entries.select {|file_entry| - existing_paths.key?(file_entry) - } - filtered_entries + def remove_deleted_files_entries(existent_entries, existing_targets) + if existing_targets + existent_entries.select { |path_or_ino| + existing_targets.key?(path_or_ino) + } + else + existent_entries + end end end diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 0715ab2996..dba669e1eb 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -48,7 +48,7 @@ def follow_inodes_block sub_test_case '#try_compact' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact @file.seek(0) lines = @file.readlines @@ -62,7 +62,7 @@ def follow_inodes_block valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact @file.seek(0) lines = @file.readlines @@ -71,7 +71,7 @@ def follow_inodes_block test 'does not change when the file is changed' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}) mock.proxy(pf).fetch_compacted_entries do |r| @file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n") @@ -86,7 +86,7 @@ def follow_inodes_block end test 'update seek position of remained position entry' do - pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}) target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('path1', -1) target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', -1) target_info3 = Fluent::Plugin::TailInput::TargetInfo.new('path3', -1) @@ -115,6 +115,24 @@ def follow_inodes_block assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1] assert_equal 2, lines.size end + + test 'should ignore initial existing files on follow_inode' do + write_data(@file, TEST_CONTENT) + pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, {}, **{logger: $log}) + @file.seek(0) + assert_equal([], @file.readlines) + + @file.seek(0) + write_data(@file, TEST_CONTENT) + pos_file.try_compact + + @file.seek(0) + assert_equal([ + "valid_path\t0000000000000002\t0000000000000001\n", + "inode23bit\t0000000000000000\t0000000000000000\n", + ], + @file.readlines) + end end sub_test_case '#load' do @@ -134,7 +152,7 @@ def follow_inodes_block valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).load + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).load @file.seek(0) lines = @file.readlines