From 147ee5a143d36f4500201229234f741e2d3069dd Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 15 Jul 2021 12:33:59 +0900 Subject: [PATCH 1/3] position_file: Fix invalid seek value with a multi-bytes path in Entry Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/position_file.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index faf27e95ee..5a6ea425bc 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -151,9 +151,9 @@ def fetch_compacted_entries end if @follow_inodes - entries[ino] = Entry.new(path, pos, ino, file_pos + path.size + 1) + entries[ino] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1) else - entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1) + entries[path] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1) end file_pos += line.size end From 83121f9029945c9c8f6c00a9f925564bdd8a7908 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 16 Jul 2021 17:01:39 +0900 Subject: [PATCH 2/3] position_file: Add a test for try_compact on follow_inode There is a bug that entries for existing files are removed when follow_inode is specified. This test demonstrates it. Failure: test: should ignore initial existing files on follow_inode(IntailPositionFileTest::#try_compact) /home/aho/Projects/Fluentd/fluentd/test/plugin/in_tail/test_position_file.rb:130:in `block (2 levels) in ' 127: pos_file.try_compact 128: 129: @file.seek(0) => 130: assert_equal([ 131: "valid_path\t0000000000000002\t0000000000000001\n", 132: "inode23bit\t0000000000000000\t0000000000000000\n", 133: ], <["valid_path\t0000000000000002\t0000000000000001\n", "inode23bit\t0000000000000000\t0000000000000000\n"]> expected but was <[]> diff: ? ["valid_path\t0000000000000002\t0000000000000001\n", ? ] ? ??????????????????????????????????????????????????? - "inode23bit\t0000000000000000\t0000000000000000\n"] Signed-off-by: Takuro Ashie --- test/plugin/in_tail/test_position_file.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 0715ab2996..e5ecd9e334 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -115,6 +115,24 @@ def follow_inodes_block assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1] assert_equal 2, lines.size end + + test 'should ignore initial existing files on follow_inode' do + write_data(@file, TEST_CONTENT) + pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, {}, **{logger: $log}) + @file.seek(0) + assert_equal([], @file.readlines) + + @file.seek(0) + write_data(@file, TEST_CONTENT) + pos_file.try_compact + + @file.seek(0) + assert_equal([ + "valid_path\t0000000000000002\t0000000000000001\n", + "inode23bit\t0000000000000000\t0000000000000000\n", + ], + @file.readlines) + end end sub_test_case '#load' do From b95e4a717eea5c3feaa40314ac424eea61afd0e7 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 15 Jul 2021 12:17:39 +0900 Subject: [PATCH 3/3] position_file: Don't remove existing entries on try_compact When both follow_inodes and pos_file_compaction_interval are specified, file entries that didn't exist on start up will be deleted unexpectedly when compaction is triggered. This patch fixes this issue. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/position_file.rb | 34 ++++++++++++---------- test/plugin/in_tail/test_position_file.rb | 10 +++---- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 5a6ea425bc..d00ecd3186 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -22,19 +22,18 @@ class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze - def self.load(file, follow_inodes, existing_paths, logger:) - pf = new(file, follow_inodes, existing_paths, logger: logger) - pf.load + def self.load(file, follow_inodes, existing_targets, logger:) + pf = new(file, follow_inodes, logger: logger) + pf.load(existing_targets) pf end - def initialize(file, follow_inodes, existing_paths, logger: nil) + def initialize(file, follow_inodes, logger: nil) @file = file @logger = logger @file_mutex = Mutex.new @map = {} @follow_inodes = follow_inodes - @existing_paths = existing_paths end def [](target_info) @@ -60,8 +59,8 @@ def unwatch(target_info) end end - def load - compact + def load(existing_targets = nil) + compact(existing_targets) map = {} @file_mutex.synchronize do @@ -118,9 +117,9 @@ def try_compact private - def compact + def compact(existing_targets = nil) @file_mutex.synchronize do - entries = fetch_compacted_entries.values.map(&:to_entry_fmt) + entries = fetch_compacted_entries(existing_targets).values.map(&:to_entry_fmt) @file.pos = 0 @file.truncate(0) @@ -128,7 +127,7 @@ def compact end end - def fetch_compacted_entries + def fetch_compacted_entries(existing_targets = nil) entries = {} @file.pos = 0 @@ -159,15 +158,18 @@ def fetch_compacted_entries end end - entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes + entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes entries end - def remove_deleted_files_entries(existent_entries, existing_paths) - filtered_entries = existent_entries.select {|file_entry| - existing_paths.key?(file_entry) - } - filtered_entries + def remove_deleted_files_entries(existent_entries, existing_targets) + if existing_targets + existent_entries.select { |path_or_ino| + existing_targets.key?(path_or_ino) + } + else + existent_entries + end end end diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index e5ecd9e334..dba669e1eb 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -48,7 +48,7 @@ def follow_inodes_block sub_test_case '#try_compact' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact @file.seek(0) lines = @file.readlines @@ -62,7 +62,7 @@ def follow_inodes_block valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact @file.seek(0) lines = @file.readlines @@ -71,7 +71,7 @@ def follow_inodes_block test 'does not change when the file is changed' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}) mock.proxy(pf).fetch_compacted_entries do |r| @file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n") @@ -86,7 +86,7 @@ def follow_inodes_block end test 'update seek position of remained position entry' do - pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}) target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('path1', -1) target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', -1) target_info3 = Fluent::Plugin::TailInput::TargetInfo.new('path3', -1) @@ -152,7 +152,7 @@ def follow_inodes_block valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).load + Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).load @file.seek(0) lines = @file.readlines