Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some pos_file bugs #3459

Merged
merged 3 commits into from
Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

def self.load(file, follow_inodes, existing_paths, logger:)
pf = new(file, follow_inodes, existing_paths, logger: logger)
pf.load
def self.load(file, follow_inodes, existing_targets, logger:)
pf = new(file, follow_inodes, logger: logger)
pf.load(existing_targets)
pf
end

def initialize(file, follow_inodes, existing_paths, logger: nil)
def initialize(file, follow_inodes, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
@existing_paths = existing_paths
end

def [](target_info)
Expand All @@ -60,8 +59,8 @@ def unwatch(target_info)
end
end

def load
compact
def load(existing_targets = nil)
compact(existing_targets)

map = {}
@file_mutex.synchronize do
Expand Down Expand Up @@ -118,17 +117,17 @@ def try_compact

private

def compact
def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries.values.map(&:to_entry_fmt)
entries = fetch_compacted_entries(existing_targets).values.map(&:to_entry_fmt)

@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
end
end

def fetch_compacted_entries
def fetch_compacted_entries(existing_targets = nil)
entries = {}

@file.pos = 0
Expand All @@ -151,23 +150,26 @@ def fetch_compacted_entries
end

if @follow_inodes
entries[ino] = Entry.new(path, pos, ino, file_pos + path.size + 1)
entries[ino] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
else
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
entries[path] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
end
file_pos += line.size
end
end

entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes
entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes
entries
end

def remove_deleted_files_entries(existent_entries, existing_paths)
filtered_entries = existent_entries.select {|file_entry|
existing_paths.key?(file_entry)
}
filtered_entries
def remove_deleted_files_entries(existent_entries, existing_targets)
if existing_targets
existent_entries.select { |path_or_ino|
existing_targets.key?(path_or_ino)
}
else
existent_entries
end
end
end

Expand Down
28 changes: 23 additions & 5 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def follow_inodes_block
sub_test_case '#try_compact' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact
Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact

@file.seek(0)
lines = @file.readlines
Expand All @@ -62,7 +62,7 @@ def follow_inodes_block
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).try_compact
Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).try_compact

@file.seek(0)
lines = @file.readlines
Expand All @@ -71,7 +71,7 @@ def follow_inodes_block

test 'does not change when the file is changed' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log})
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log})

mock.proxy(pf).fetch_compacted_entries do |r|
@file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n")
Expand All @@ -86,7 +86,7 @@ def follow_inodes_block
end

test 'update seek position of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log})
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log})
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('path1', -1)
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', -1)
target_info3 = Fluent::Plugin::TailInput::TargetInfo.new('path3', -1)
Expand Down Expand Up @@ -115,6 +115,24 @@ def follow_inodes_block
assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1]
assert_equal 2, lines.size
end

test 'should ignore initial existing files on follow_inode' do
write_data(@file, TEST_CONTENT)
pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, {}, **{logger: $log})
@file.seek(0)
assert_equal([], @file.readlines)

@file.seek(0)
write_data(@file, TEST_CONTENT)
pos_file.try_compact

@file.seek(0)
assert_equal([
"valid_path\t0000000000000002\t0000000000000001\n",
"inode23bit\t0000000000000000\t0000000000000000\n",
],
@file.readlines)
end
end

sub_test_case '#load' do
Expand All @@ -134,7 +152,7 @@ def follow_inodes_block
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log}).load
Fluent::Plugin::TailInput::PositionFile.new(@file, false, **{logger: $log}).load

@file.seek(0)
lines = @file.readlines
Expand Down