Skip to content

Commit

Permalink
Update seek pos during compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Mar 31, 2020
1 parent a750674 commit 529af8a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
14 changes: 12 additions & 2 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def try_compact
@file.pos = 0
@file.truncate(0)
@file.write(entries.values.map(&:to_entry_fmt).join)

entries.each do |path, val|
if (m = @map[path])
m.seek = val.seek
end
end
else
# skip
end
Expand All @@ -115,6 +121,7 @@ def fetch_compacted_entries
entries = {}

@file.pos = 0
file_pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
if m.nil?
Expand All @@ -132,15 +139,16 @@ def fetch_compacted_entries
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

entries[path] = Entry.new(path, pos, ino)
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
file_pos += line.size
end
end

entries
end
end

Entry = Struct.new(:path, :pos, :ino) do
Entry = Struct.new(:path, :pos, :ino, :seek) do
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def to_entry_fmt
Expand All @@ -165,6 +173,8 @@ def initialize(file, file_mutex, seek, pos, inode)
@inode = inode
end

attr_writer :seek

def update(ino, pos)
@file_mutex.synchronize {
@file.pos = @seek
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def write_data(f, content)
lines = @file.readlines
assert_equal 5, lines.size
end

test 'update seek postion of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log)
pf['path1']
pf['path2']
pf['path3']
pf.unwatch('path1')

pf.try_compact

@file.seek(0)
lines = @file.readlines
assert_equal "path2\t0000000000000000\t0000000000000000\n", lines[0]
assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1]
assert_equal 2, lines.size

pf.unwatch('path2')
pf.unwatch('path3')
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1]
assert_equal 2, lines.size
end
end

sub_test_case '#load' do
Expand Down

0 comments on commit 529af8a

Please sign in to comment.