diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index d2e2a5dcfb..eb9f17ec18 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -71,6 +71,8 @@ def initialize config_param :rotate_wait, :time, default: 5 desc 'Fluentd will record the position it last read into this file.' config_param :pos_file, :string, default: nil + desc 'The cleanup interval of pos file' + config_param :pos_file_compaction_interval, :integer, default: nil desc 'Start to read the logs from the head of file, not bottom.' config_param :read_from_head, :bool, default: false # When the program deletes log file and re-creates log file with same filename after passed refresh_interval, @@ -211,7 +213,14 @@ def start FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true - @pf = PositionFile.parse(@pf_file) + @pf = PositionFile.load(@pf_file, logger: log) + + if @pos_file_compaction_interval + timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do + log.info('Clean up the pos file') + @pf.try_compact + end + end end refresh_watchers unless @skip_refresh_on_startup diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 02bcf738e0..34c0e3adcb 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -14,7 +14,7 @@ # limitations under the License. # -require 'fluent/plugin/in_tail' +require 'fluent/plugin/input' module Fluent::Plugin class TailInput < Fluent::Plugin::Input @@ -23,10 +23,17 @@ class PositionFile POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze - def initialize(file, file_mutex, map) + def self.load(file, logger:) + pf = new(file, logger: logger) + pf.load + pf + end + + def initialize(file, logger: nil) @file = file - @file_mutex = file_mutex - @map = map + @logger = logger + @file_mutex = Mutex.new + @map = {} end def [](path) @@ -48,56 +55,89 @@ def unwatch(path) end end - def self.parse(file) - compact(file) + def load + compact - file_mutex = Mutex.new map = {} - file.pos = 0 - file.each_line {|line| - m = POSITION_FILE_ENTRY_REGEX.match(line) - unless m - $log.warn "Unparsable line in pos_file: #{line}" - next + @file_mutex.synchronize do + @file.pos = 0 + + @file.each_line do |line| + m = POSITION_FILE_ENTRY_REGEX.match(line) + next if m.nil? + + path = m[1] + pos = m[2].to_i(16) + ino = m[3].to_i(16) + seek = @file.pos - line.bytesize + path.bytesize + 1 + map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino) end - path = m[1] - pos = m[2].to_i(16) - ino = m[3].to_i(16) - seek = file.pos - line.bytesize + path.bytesize + 1 - map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino) - } - new(file, file_mutex, map) + end + + @map = map + end + + # This method is similer to #compact but it tries to get less lock to avoid a lock contention + def try_compact + last_modified = nil + size = nil + + @file_mutex.synchronize do + last_modified = @file.mtime + size = @file.size + end + + entries = fetch_compacted_entries + + @file_mutex.synchronize do + if last_modified == @file.mtime && size == @file.size + @file.pos = 0 + @file.truncate(0) + @file.write(entries.join) + else + # skip + end + end + end + + private + + def compact + @file_mutex.synchronize do + entries = fetch_compacted_entries + + @file.pos = 0 + @file.truncate(0) + @file.write(entries.join) + end end - # Clean up unwatched file entries - def self.compact(file) - existent_entries = {} - file.pos = 0 - file.each_line do |line| + def fetch_compacted_entries + entries = {} + + @file.pos = 0 + @file.each_line do |line| m = POSITION_FILE_ENTRY_REGEX.match(line) - unless m - $log.warn "Unparsable line in pos_file: #{line}" + if m.nil? + @logger.warn "Unparsable line in pos_file: #{line}" if @logger next end + path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) - if pos == UNWATCHED_POSITION - next - end + @logger.debug "Remove unwatched line from pos_file: #{line}" if @logger + else + if entries.include?(path) + @logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger + end - if existent_entries.include?(path) - $log.warn("#{path} already exists. use latest one: deleted #{existent_entries[path]}") + entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]) end - - # 32bit inode converted to 64bit at this phase - existent_entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]) end - file.pos = 0 - file.truncate(0) - file.write(existent_entries.values.join) + entries.values end end diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index df31d74f3c..6e31fa816b 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -1,9 +1,17 @@ require_relative '../../helper' require 'fluent/plugin/in_tail/position_file' +require 'fileutils' +require 'tempfile' + class IntailPositionFileTest < Test::Unit::TestCase setup do - @file = StringIO.new(+'') + @file = Tempfile.new('intail_position_file_test') + end + + teardown do + @file.close rescue nil + @file.unlink rescue nil end UNWATCHED_STR = '%016x' % Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION @@ -19,10 +27,21 @@ def write_data(f, content) f.seek(0) end - sub_test_case '.compact' do + test '.load' do + write_data(@file, TEST_CONTENT) + Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) + + @file.seek(0) + lines = @file.readlines + assert_equal 2, lines.size + assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] + assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + end + + sub_test_case '#try_compact' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.compact(@file) + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact @file.seek(0) lines = @file.readlines @@ -36,29 +55,59 @@ def write_data(f, content) valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.compact(@file) + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact @file.seek(0) lines = @file.readlines assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0] end + + test 'does not change when the file is changed' do + write_data(@file, TEST_CONTENT) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log) + + mock.proxy(pf).fetch_compacted_entries do |r| + @file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n") + r + end + + pf.try_compact + + @file.seek(0) + lines = @file.readlines + assert_equal 5, lines.size + end end - test '.parse' do - write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.parse(@file) + sub_test_case '#load' do + test 'compact invalid and convert 32 bit inode value' do + write_data(@file, TEST_CONTENT) + Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) - @file.seek(0) - lines = @file.readlines - assert_equal 2, lines.size - assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] - assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + @file.seek(0) + lines = @file.readlines + assert_equal 2, lines.size + assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] + assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + end + + test 'compact data if duplicated line' do + write_data(@file, <<~EOF) + valid_path\t0000000000000002\t0000000000000001 + valid_path\t0000000000000003\t0000000000000004 + EOF + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).load + + @file.seek(0) + lines = @file.readlines + assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0] + end end sub_test_case '#[]' do test 'return entry' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) f = pf['valid_path'] assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class @@ -82,7 +131,7 @@ def write_data(f, content) test 'does not change other value position if other entry try to write' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) f = pf['nonexist_path'] assert_equal 0, f.read_inode @@ -103,7 +152,7 @@ def write_data(f, content) sub_test_case '#unwatch' do test 'deletes entry by path' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) p1 = pf['valid_path'] assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 891665b7d7..2a9e3b5883 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1088,7 +1088,7 @@ def test_z_refresh_watchers plugin = create_driver(EX_CONFIG, false).instance sio = StringIO.new plugin.instance_eval do - @pf = Fluent::Plugin::TailInput::PositionFile.parse(sio) + @pf = Fluent::Plugin::TailInput::PositionFile.load(sio, logger: $log) @loop = Coolio::Loop.new end