From 76283d8a7c77b97953c7ac8fae29a0e04fe55c6a Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 29 Jan 2020 18:12:09 +0900 Subject: [PATCH 01/10] Replace impl Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 123 +++++++++++++++------ test/plugin/in_tail/test_position_file.rb | 9 +- 2 files changed, 97 insertions(+), 35 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 02bcf738e0..0f122dbe6a 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -23,10 +23,11 @@ class PositionFile POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze - def initialize(file, file_mutex, map) + def initialize(file, file_mutex, map, logger: nil) @file = file @file_mutex = file_mutex @map = map + @logger = logger end def [](path) @@ -49,55 +50,109 @@ def unwatch(path) end def self.parse(file) - compact(file) + load(file, logger: $log) + end + + # Clean up unwatched file entries + def self.compact(file) + pf = new2(file, logger: $log) + pf.try_compact + pf + end + + def self.load(file, logger:) + pf = new2(file, logger: logger) + pf.load + pf + end + + def self.new2(file, logger:) + new(file, Mutex.new, {}, logger: logger) + end + + def load + compact - file_mutex = Mutex.new map = {} - file.pos = 0 - file.each_line {|line| - m = POSITION_FILE_ENTRY_REGEX.match(line) - unless m - $log.warn "Unparsable line in pos_file: #{line}" - next + @file_mutex.synchronize do + @file.pos = 0 + + @file.each_line do |line| + m = POSITION_FILE_ENTRY_REGEX.match(line) + next if m.nil? + + path = m[1] + pos = m[2].to_i(16) + ino = m[3].to_i(16) + seek = @file.pos - line.bytesize + path.bytesize + 1 + map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino) end - path = m[1] - pos = m[2].to_i(16) - ino = m[3].to_i(16) - seek = file.pos - line.bytesize + path.bytesize + 1 - map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino) - } - new(file, file_mutex, map) + end + + @map = map end - # Clean up unwatched file entries - def self.compact(file) - existent_entries = {} - file.pos = 0 - file.each_line do |line| + # This method is similer to #compact but it tries to get less lock to avoid a lock contention + def try_compact + last_modified = nil + size = nil + + @file_mutex.synchronize do + last_modified = @file.mtime + size = @file.size + end + + entries = fetch_compacted_entries + + @file_mutex.synchronize do + if last_modified == @file.mtime && size == @file.size + @file.pos = 0 + @file.truncate(0) + @file.write(entries.join) + else + # skip + end + end + end + + private + + def compact + @file_mutex.synchronize do + entries = fetch_compacted_entries + + @file.pos = 0 + @file.truncate(0) + @file.write(entries.join) + end + end + + def fetch_compacted_entries + entries = {} + + @file.pos = 0 + @file.each_line do |line| m = POSITION_FILE_ENTRY_REGEX.match(line) - unless m - $log.warn "Unparsable line in pos_file: #{line}" + if m.nil? + @logger.warn "Unparsable line in pos_file: #{line}" if @logger next end + path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) - if pos == UNWATCHED_POSITION - next - end + @logger.debug "Remove unwatched line from pos_file: #{line}" if @logger + else + if entries.include?(path) + @logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger + end - if existent_entries.include?(path) - $log.warn("#{path} already exists. use latest one: deleted #{existent_entries[path]}") + entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]) end - - # 32bit inode converted to 64bit at this phase - existent_entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]) end - file.pos = 0 - file.truncate(0) - file.write(existent_entries.values.join) + entries.values end end diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index df31d74f3c..0855fe5a27 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -1,9 +1,16 @@ require_relative '../../helper' require 'fluent/plugin/in_tail/position_file' +require 'tempfile' + class IntailPositionFileTest < Test::Unit::TestCase setup do - @file = StringIO.new(+'') + @file = Tempfile.new('intail_position_file_test') + end + + teardown do + @file.close rescue nil + @file.unlink rescue nil end UNWATCHED_STR = '%016x' % Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION From ad20e9fd5c194a5e59bde287fe646f982dd878ea Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 29 Jan 2020 18:14:23 +0900 Subject: [PATCH 02/10] Change interface of initialize Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail.rb | 2 +- lib/fluent/plugin/in_tail/position_file.rb | 20 ++++++++------------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index d2e2a5dcfb..09726e4291 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -211,7 +211,7 @@ def start FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true - @pf = PositionFile.parse(@pf_file) + @pf = PositionFile.parse(@pf_file, logger: $log) end refresh_watchers unless @skip_refresh_on_startup diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 0f122dbe6a..a9beae123b 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -23,11 +23,11 @@ class PositionFile POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze - def initialize(file, file_mutex, map, logger: nil) + def initialize(file, logger: nil) @file = file - @file_mutex = file_mutex - @map = map @logger = logger + @file_mutex = Mutex.new + @map = {} end def [](path) @@ -49,27 +49,23 @@ def unwatch(path) end end - def self.parse(file) - load(file, logger: $log) + def self.parse(file, logger: $log) + load(file, logger: logger) end # Clean up unwatched file entries - def self.compact(file) - pf = new2(file, logger: $log) + def self.compact(file, logger: $log) + pf = new(file, logger: logger) pf.try_compact pf end def self.load(file, logger:) - pf = new2(file, logger: logger) + pf = new(file, logger: logger) pf.load pf end - def self.new2(file, logger:) - new(file, Mutex.new, {}, logger: logger) - end - def load compact From 71fef9e2b44911bd72684f74172ca66765ceeed0 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 29 Jan 2020 18:16:54 +0900 Subject: [PATCH 03/10] Use PositionFile.load Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail.rb | 2 +- lib/fluent/plugin/in_tail/position_file.rb | 4 -- test/plugin/in_tail/test_position_file.rb | 47 +++++++++++++++++----- test/plugin/test_in_tail.rb | 2 +- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 09726e4291..52cd1fa955 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -211,7 +211,7 @@ def start FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true - @pf = PositionFile.parse(@pf_file, logger: $log) + @pf = PositionFile.load(@pf_file, logger: $log) end refresh_watchers unless @skip_refresh_on_startup diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index a9beae123b..a4ca9132e0 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -49,10 +49,6 @@ def unwatch(path) end end - def self.parse(file, logger: $log) - load(file, logger: logger) - end - # Clean up unwatched file entries def self.compact(file, logger: $log) pf = new(file, logger: logger) diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 0855fe5a27..312de3d05e 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -26,6 +26,17 @@ def write_data(f, content) f.seek(0) end + test '.load' do + write_data(@file, TEST_CONTENT) + Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) + + @file.seek(0) + lines = @file.readlines + assert_equal 2, lines.size + assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] + assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + end + sub_test_case '.compact' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) @@ -51,21 +62,35 @@ def write_data(f, content) end end - test '.parse' do - write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.parse(@file) + sub_test_case '#load' do + test 'compact invalid and convert 32 bit inode value' do + write_data(@file, TEST_CONTENT) + Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) - @file.seek(0) - lines = @file.readlines - assert_equal 2, lines.size - assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] - assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + @file.seek(0) + lines = @file.readlines + assert_equal 2, lines.size + assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0] + assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] + end + + test 'compact data if duplicated line' do + write_data(@file, <<~EOF) + valid_path\t0000000000000002\t0000000000000001 + valid_path\t0000000000000003\t0000000000000004 + EOF + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).load + + @file.seek(0) + lines = @file.readlines + assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0] + end end sub_test_case '#[]' do test 'return entry' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) f = pf['valid_path'] assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class @@ -89,7 +114,7 @@ def write_data(f, content) test 'does not change other value position if other entry try to write' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) f = pf['nonexist_path'] assert_equal 0, f.read_inode @@ -110,7 +135,7 @@ def write_data(f, content) sub_test_case '#unwatch' do test 'deletes entry by path' do write_data(@file, TEST_CONTENT) - pf = Fluent::Plugin::TailInput::PositionFile.parse(@file) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log) p1 = pf['valid_path'] assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 891665b7d7..2a9e3b5883 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1088,7 +1088,7 @@ def test_z_refresh_watchers plugin = create_driver(EX_CONFIG, false).instance sio = StringIO.new plugin.instance_eval do - @pf = Fluent::Plugin::TailInput::PositionFile.parse(sio) + @pf = Fluent::Plugin::TailInput::PositionFile.load(sio, logger: $log) @loop = Coolio::Loop.new end From 7ee81da649497ba19e1f9686fc835643c2b8c399 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 30 Jan 2020 11:56:07 +0900 Subject: [PATCH 04/10] test: Add try_compact test Signed-off-by: Yuta Iwama --- test/plugin/in_tail/test_position_file.rb | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 312de3d05e..8c9232f878 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -1,6 +1,7 @@ require_relative '../../helper' require 'fluent/plugin/in_tail/position_file' +require 'fileutils' require 'tempfile' class IntailPositionFileTest < Test::Unit::TestCase @@ -37,10 +38,10 @@ def write_data(f, content) assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1] end - sub_test_case '.compact' do + sub_test_case '#try_compact' do test 'compact invalid and convert 32 bit inode value' do write_data(@file, TEST_CONTENT) - Fluent::Plugin::TailInput::PositionFile.compact(@file) + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact @file.seek(0) lines = @file.readlines @@ -54,12 +55,27 @@ def write_data(f, content) valid_path\t0000000000000002\t0000000000000001 valid_path\t0000000000000003\t0000000000000004 EOF - Fluent::Plugin::TailInput::PositionFile.compact(@file) + Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact @file.seek(0) lines = @file.readlines assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0] end + + test 'does not change when the file is changed' do + write_data(@file, TEST_CONTENT) + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log) + + mock.proxy(pf).fetch_compacted_entries do + FileUtils.touch(@file.path) # change mtime + end + + pf.try_compact + + @file.seek(0) + lines = @file.readlines + assert_equal 4, lines.size + end end sub_test_case '#load' do From ff8fa6f608aa31058ac9daa81e93d02b203188cf Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 30 Jan 2020 11:57:23 +0900 Subject: [PATCH 05/10] Delete compact class method Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index a4ca9132e0..8555c33dcf 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -23,6 +23,12 @@ class PositionFile POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze + def self.load(file, logger:) + pf = new(file, logger: logger) + pf.load + pf + end + def initialize(file, logger: nil) @file = file @logger = logger @@ -49,19 +55,6 @@ def unwatch(path) end end - # Clean up unwatched file entries - def self.compact(file, logger: $log) - pf = new(file, logger: logger) - pf.try_compact - pf - end - - def self.load(file, logger:) - pf = new(file, logger: logger) - pf.load - pf - end - def load compact From 69901c4475d8273babece2ffec5c884422b4025b Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 30 Jan 2020 12:13:42 +0900 Subject: [PATCH 06/10] Add pos_file_compaction_interval Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 52cd1fa955..12272cec34 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -71,6 +71,8 @@ def initialize config_param :rotate_wait, :time, default: 5 desc 'Fluentd will record the position it last read into this file.' config_param :pos_file, :string, default: nil + desc 'The cleanup interval of pos file' + config_param :pos_file_compaction_interval, :integer, default: nil desc 'Start to read the logs from the head of file, not bottom.' config_param :read_from_head, :bool, default: false # When the program deletes log file and re-creates log file with same filename after passed refresh_interval, @@ -212,6 +214,13 @@ def start @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @pf = PositionFile.load(@pf_file, logger: $log) + + if @pos_file_compaction_interval + timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do + $log.info('Clean up the pos file') + @pf.try_compact + end + end end refresh_watchers unless @skip_refresh_on_startup From 0ffdd845d39d6f6d1fe28638edc474fa7177768b Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 3 Feb 2020 10:42:40 +0900 Subject: [PATCH 07/10] change `$log` to `log` In order to be able to use different log level in per plugin Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 12272cec34..eb9f17ec18 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -213,11 +213,11 @@ def start FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true - @pf = PositionFile.load(@pf_file, logger: $log) + @pf = PositionFile.load(@pf_file, logger: log) if @pos_file_compaction_interval timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do - $log.info('Clean up the pos file') + log.info('Clean up the pos file') @pf.try_compact end end From 448a1bf1bf98ed52d6455267ed6c85f6a75b53cd Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 3 Feb 2020 11:09:41 +0900 Subject: [PATCH 08/10] Return returned value it returned path name of file. Signed-off-by: Yuta Iwama --- test/plugin/in_tail/test_position_file.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 8c9232f878..af8110ced3 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -66,8 +66,9 @@ def write_data(f, content) write_data(@file, TEST_CONTENT) pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log) - mock.proxy(pf).fetch_compacted_entries do + mock.proxy(pf).fetch_compacted_entries do |r| FileUtils.touch(@file.path) # change mtime + r end pf.try_compact From bff27eaef04ef7a2318fae2bffa744738363dee9 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 3 Feb 2020 11:23:14 +0900 Subject: [PATCH 09/10] stop circular require Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 8555c33dcf..34c0e3adcb 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -14,7 +14,7 @@ # limitations under the License. # -require 'fluent/plugin/in_tail' +require 'fluent/plugin/input' module Fluent::Plugin class TailInput < Fluent::Plugin::Input From 8946e536e48c8bda125f7fd075f7a0c388b8850a Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Mon, 3 Feb 2020 11:24:52 +0900 Subject: [PATCH 10/10] change content value Testing with Fileutils.touch needs to call `sleep` to change the value of mtime Signed-off-by: Yuta Iwama --- test/plugin/in_tail/test_position_file.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index af8110ced3..6e31fa816b 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -67,7 +67,7 @@ def write_data(f, content) pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log) mock.proxy(pf).fetch_compacted_entries do |r| - FileUtils.touch(@file.path) # change mtime + @file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n") r end @@ -75,7 +75,7 @@ def write_data(f, content) @file.seek(0) lines = @file.readlines - assert_equal 4, lines.size + assert_equal 5, lines.size end end