From f112cddf3e8ce896c3b501a88408f879db4f6a4c Mon Sep 17 00:00:00 2001 From: ganmacs Date: Wed, 10 Aug 2016 13:28:16 +0900 Subject: [PATCH 01/26] Add #nil? method to Metadata --- lib/fluent/plugin/buffer.rb | 6 +++++- lib/fluent/plugin/output.rb | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 1eff6de52f..b463870249 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -55,7 +55,11 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than # if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD - Metadata = Struct.new(:timekey, :tag, :variables) + Metadata = Struct.new(:timekey, :tag, :variables) do + def nil? + timekey.nil? && tag.nil? && variables.nil? + end + end # for tests attr_accessor :stage_size, :queue_size diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 32bc6ef3d0..60bb3299f2 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -472,7 +472,7 @@ def implement?(feature) # TODO: optimize this code def extract_placeholders(str, metadata) - if metadata.timekey.nil? && metadata.tag.nil? && metadata.variables.nil? + if metadata.nil? str else rvalue = str From e8b318b0956bf638ed9f7d58068a19b313a4c13f Mon Sep 17 00:00:00 2001 From: ganmacs Date: Wed, 10 Aug 2016 17:34:00 +0900 Subject: [PATCH 02/26] Create `out_seconary_file` plugins This is new plugin only for section. --- lib/fluent/plugin/out_secondary_file.rb | 160 ++++++++++++ lib/fluent/plugin/output.rb | 4 +- lib/fluent/test/base.rb | 2 +- test/plugin/test_out_secondary_file.rb | 316 ++++++++++++++++++++++++ 4 files changed, 480 insertions(+), 2 deletions(-) create mode 100644 lib/fluent/plugin/out_secondary_file.rb create mode 100644 test/plugin/test_out_secondary_file.rb diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb new file mode 100644 index 0000000000..a6b4186ca6 --- /dev/null +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -0,0 +1,160 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "fileutils" +require 'fluent/plugin/output' +require "fluent/config/error" +require "fluent/system_config" + + +module Fluent::Plugin + class SecondaryFileOutput < Output + include Fluent::SystemConfig::Mixin + + Fluent::Plugin.register_output("secondary_file", self) + + SUPPORTED_COMPRESS = { + 'gz' => :gz, + 'gzip' => :gz, + } + + FILE_PERMISSION = 0644 + DIR_PERMISSION = 0755 + + desc "The Path of the file." + config_param :path, :string + desc "The flushed chunk is appended to existence file or not." + config_param :append, :bool, default: false + config_param :compress, default: nil do |val| + SUPPORTED_COMPRESS[val].tap do |e| + raise ConfigError, "Unsupported compression algorithm '#{val}'" unless e + end + end + + def initialize + require "time" + require "fluent/plugin/file_util" + super + end + + def configure(conf) + unless @as_secondary + raise Fluent::ConfigError, "This plugin can only be used in the section" + end + + unless @path = conf["path"] + raise Fluent::ConfigError, "'path' parameter is required on secondary file output" + end + + if pos = @path.index('*') + @path_prefix = @path[0,pos] + @path_suffix = @path[pos+1..-1] + else + @path_prefix = @path+"." + @path_suffix = ".log" + end + + test_path = generate_path(Time.now.strftime('%Y%m%d')) + unless Fluent::FileUtil.writable_p?(test_path) + raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" + end + + @dir_perm = system_config.dir_permission || DIR_PERMISSION + @file_perm = system_config.file_permission || FILE_PERMISSION + + # @tempalte = genereate_tempalte fix + super + end + + def write(chunk) + id = extract_placeholders('', chunk) + path = generate_path(id) + + log.info path + FileUtils.mkdir_p File.dirname(path), mode: @dir_perm + + case @compress + when nil + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + chunk.write_to(f) + } + when :gz + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz) + gz.close + } + end + + path + end + + def extract_placeholders(str, chunk) + if chunk.metadata.nil? + chunk.unique_id + else + super(str, chunk.metadata) + end + end + + private + + def genereate_tempalte + rvalue = '' + if @chunk_key_tag + rvalue += "{tag}_" + end + + if @chunk_key_time + rvalue += "%Y%m%d_" + end + + @chunk_keys.each do |v| + rvalue += "${#{v}}_" + end + + if rvalue.end_with?("_") + rvalue = rvalue[0..-2] + end + + rvalue + end + + def suffix + case @compress + when nil + '' + when :gz + ".gz" + end + end + + def generate_path(id) + if @append + "#{@path_prefix}#{id}#{@path_suffix}#{suffix}" + else + i = 0 + loop do + path = "#{@path_prefix}#{id}_#{i}#{@path_suffix}#{suffix}" + return path unless File.exist?(path) + i += 1; + end + end + end + end +end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 60bb3299f2..9e0bf2a5bf 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -191,7 +191,9 @@ def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary (class << self; self; end).module_eval do - define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } + if require_override + define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } + end define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } end diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index ac17cf4561..3bb420191b 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -37,7 +37,7 @@ def initialize(klass, &block) else @instance = klass end - @instance.router = Engine.root_agent.event_router + @instance.router = Engine.root_agent.event_router if @instance.respond_to?(:router=) @instance.log = TestLogger.new Engine.root_agent.instance_variable_set(:@log, @instance.log) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb new file mode 100644 index 0000000000..fad493ae6b --- /dev/null +++ b/test/plugin/test_out_secondary_file.rb @@ -0,0 +1,316 @@ +require_relative '../helper' +require 'time' +require 'fileutils' +require 'fluent/test' +require 'fluent/event' +require 'fluent/plugin/buffer' +require 'fluent/plugin/out_secondary_file' +require 'fluent/plugin/buffer/memory_chunk' + +class FileOutputTest < Test::Unit::TestCase + def setup + Fluent::Test.setup + FileUtils.rm_rf(TMP_DIR) + FileUtils.mkdir_p(TMP_DIR) + end + + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file#{ENV['TEST_ENV_NUMBER']}") + + CONFIG = %[ + path #{TMP_DIR}/out_file_test + compress gz + ] + + def create_driver(conf = CONFIG) + primary = Class.new + c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + c.instance.acts_as_secondary(primary) + c.configure(conf) + end + + def test_configure + d = create_driver %[ + path test_path + compress gz + ] + assert_equal 'test_path', d.instance.path + assert_equal :gz, d.instance.compress + end + + def test_asts_as_secondary + c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + assert_raise do + c.configure(conf) + end + end + + def test_recieved_path + assert_raise(Fluent::ConfigError) do + d = create_driver %[ + compress gz + ] + end + end + + def test_path_writable + assert_nothing_raised do + create_driver %[path #{TMP_DIR}/test_path] + end + + assert_nothing_raised do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0777, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end + + assert_raise(Fluent::ConfigError) do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0555, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end + end + + + def check_gzipped_result(path, expect) + # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790 + # Following code from https://www.ruby-forum.com/topic/971591#979520 + result = '' + File.open(path, "rb") { |io| + loop do + gzr = Zlib::GzipReader.new(io) + result << gzr.read + unused = gzr.unused + gzr.finish + break if unused.nil? + io.pos -= unused.length + end + } + + assert_equal expect, result + end + + class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk; end + + def create_metadata(timekey=nil, tag=nil, variables=nil) + Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) + end + + def create_es_chunk(metadata, es) + DummyMemoryChunk.new(metadata).tap do |c| + c.concat(es.to_msgpack_stream, es.size) # standard_format + c.commit + end + end + + RECORD_VALUE = { "key" => "vlaue"} + MESSAGE_PACKED_RECORD_VALUE = "\x92\xCEM z'\x81\xA3key\xA5vlaue".force_encoding('ASCII-8BIT') + + def test_write + d = create_driver + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + es = Fluent::OneEventStream.new(time, RECORD_VALUE) + c = create_es_chunk(create_metadata, es) + path = d.instance.write(c) + + assert_equal "#{TMP_DIR}/out_file_test.#{c.unique_id}_0.log.gz", path + check_gzipped_result(path, MESSAGE_PACKED_RECORD_VALUE) + end + + # TODO + # write temaplte test if needed + + # class TestWithSystem < self + # TMP_DIR_WITH_SYSTEM = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file_system#{ENV['TEST_ENV_NUMBER']}") + # # 0750 interprets as "488". "488".to_i(8) # => 4. So, it makes wrong permission. Umm.... + # OVERRIDE_DIR_PERMISSION = 750 + # OVERRIDE_FILE_PERMISSION = 0620 + # CONFIG_WITH_SYSTEM = %[ + # path #{TMP_DIR_WITH_SYSTEM}/out_file_test + # compress gz + # utc + # + # file_permission #{OVERRIDE_FILE_PERMISSION} + # dir_permission #{OVERRIDE_DIR_PERMISSION} + # + # ] + + # def setup + # omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? + # FileUtils.rm_rf(TMP_DIR_WITH_SYSTEM) + # end + + # def parse_system(text) + # basepath = File.expand_path(File.dirname(__FILE__) + '/../../') + # Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' } + # end + + # def test_write_with_system + # system_conf = parse_system(CONFIG_WITH_SYSTEM) + # sc = Fluent::SystemConfig.new(system_conf) + # Fluent::Engine.init(sc) + # d = create_driver CONFIG_WITH_SYSTEM + + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + + # # FileOutput#write returns path + # paths = d.run + # expect_paths = ["#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz"] + # assert_equal expect_paths, paths + + # check_gzipped_result(paths[0], %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) + # dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode + # assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i) + # file_mode = "%o" % File::stat(paths[0]).mode + # assert_equal(OVERRIDE_FILE_PERMISSION, file_mode[-3, 3].to_i) + # end + # end + + # def test_write_with_format_json + # d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n") + + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + + # # FileOutput#write returns path + # paths = d.run + # check_gzipped_result(paths[0], %[#{Yajl.dump({"a" => 1, 'time' => time})}\n] + %[#{Yajl.dump({"a" => 2, 'time' => time})}\n]) + # end + + # def test_write_with_format_ltsv + # d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n") + + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + + # # FileOutput#write returns path + # paths = d.run + # check_gzipped_result(paths[0], %[a:1\ttime:2011-01-02T13:14:15Z\n] + %[a:2\ttime:2011-01-02T13:14:15Z\n]) + # end + + # def test_write_with_format_single_value + # d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n") + + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + + # # FileOutput#write returns path + # paths = d.run + # check_gzipped_result(paths[0], %[1\n] + %[2\n]) + # end + + # def test_write_path_increment + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + + # write_once = ->(){ + # d = create_driver + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + # d.run + # } + + # assert !File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") + + # # FileOutput#write returns path + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102_0.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines) + # assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size + + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102_1.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines) + # assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size + + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102_2.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines) + # assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size + # end + + # def test_write_with_append + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + + # write_once = ->(){ + # d = create_driver %[ + # path #{TMP_DIR}/out_file_test + # compress gz + # utc + # append true + # ] + # d.emit({"a"=>1}, time) + # d.emit({"a"=>2}, time) + # d.run + # } + + # # FileOutput#write returns path + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines) + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines * 2) + # paths = write_once.call + # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths + # check_gzipped_result(paths[0], formatted_lines * 3) + # end + + # sub_test_case 'path' do + # test 'normal' do + # d = create_driver(%[ + # path #{TMP_DIR}/out_file_test + # time_slice_format %Y-%m-%d-%H + # utc true + # ]) + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # # FileOutput#write returns path + # paths = d.run + # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.log"], paths + # end + + # test 'normal with append' do + # d = create_driver(%[ + # path #{TMP_DIR}/out_file_test + # time_slice_format %Y-%m-%d-%H + # utc true + # append true + # ]) + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # paths = d.run + # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.log"], paths + # end + + # test '*' do + # d = create_driver(%[ + # path #{TMP_DIR}/out_file_test.*.txt + # time_slice_format %Y-%m-%d-%H + # utc true + # ]) + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # paths = d.run + # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt"], paths + # end + + # test '* with append' do + # d = create_driver(%[ + # path #{TMP_DIR}/out_file_test.*.txt + # time_slice_format %Y-%m-%d-%H + # utc true + # append true + # ]) + # time = Time.parse("2011-01-02 13:14:15 UTC").to_i + # d.emit({"a"=>1}, time) + # paths = d.run + # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.txt"], paths + # end + # end +end From 717047b7cd00a616ec811068a7ccd337d55bb0d0 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 11 Aug 2016 16:22:11 +0900 Subject: [PATCH 03/26] Fix failed test --- lib/fluent/plugin/output.rb | 2 ++ test/plugin/test_out_secondary_file.rb | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 9e0bf2a5bf..a577376a23 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -190,6 +190,8 @@ def initialize def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary + + require_override = !self.class.instance_methods(false).include?(:extract_placeholders) (class << self; self; end).module_eval do if require_override define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index fad493ae6b..2d56252563 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -46,7 +46,7 @@ def test_asts_as_secondary def test_recieved_path assert_raise(Fluent::ConfigError) do - d = create_driver %[ + create_driver %[ compress gz ] end From 2325a804ed5f813b8fd6924ee959db00dc2792c6 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 11 Aug 2016 18:59:07 +0900 Subject: [PATCH 04/26] update secondary_out_file --- lib/fluent/plugin/buffer/chunk.rb | 3 +- lib/fluent/plugin/out_secondary_file.rb | 25 +-- test/plugin/test_out_secondary_file.rb | 269 ++++++++---------------- 3 files changed, 98 insertions(+), 199 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 157c3e809c..6d00339d62 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -49,6 +49,7 @@ class Chunk def initialize(metadata) super() @unique_id = generate_unique_id + @chunk_id = Fluent::UniqueId.hex(@unique_id) @metadata = metadata # state: unstaged/staged/queued/closed @@ -59,7 +60,7 @@ def initialize(metadata) @modified_at = Time.now end - attr_reader :unique_id, :metadata, :created_at, :modified_at, :state + attr_reader :unique_id, :chunk_id, :metadata, :created_at, :modified_at, :state # data is array of formatted record string def append(data) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index a6b4186ca6..2ad47f3ba4 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -15,20 +15,17 @@ # require "fileutils" -require 'fluent/plugin/output' +require "fluent/plugin/file_util" +require "fluent/plugin/output" require "fluent/config/error" -require "fluent/system_config" - module Fluent::Plugin class SecondaryFileOutput < Output - include Fluent::SystemConfig::Mixin - Fluent::Plugin.register_output("secondary_file", self) SUPPORTED_COMPRESS = { - 'gz' => :gz, - 'gzip' => :gz, + "gz" => :gz, + "gzip" => :gz, } FILE_PERMISSION = 0644 @@ -45,8 +42,6 @@ class SecondaryFileOutput < Output end def initialize - require "time" - require "fluent/plugin/file_util" super end @@ -63,11 +58,11 @@ def configure(conf) @path_prefix = @path[0,pos] @path_suffix = @path[pos+1..-1] else - @path_prefix = @path+"." + @path_prefix = @path + "." @path_suffix = ".log" end - test_path = generate_path(Time.now.strftime('%Y%m%d')) + test_path = generate_path(Time.now.strftime("%Y%m%d")) unless Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" end @@ -82,8 +77,6 @@ def configure(conf) def write(chunk) id = extract_placeholders('', chunk) path = generate_path(id) - - log.info path FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress @@ -106,7 +99,7 @@ def write(chunk) def extract_placeholders(str, chunk) if chunk.metadata.nil? - chunk.unique_id + chunk.chunk_id else super(str, chunk.metadata) end @@ -115,7 +108,7 @@ def extract_placeholders(str, chunk) private def genereate_tempalte - rvalue = '' + rvalue = "" if @chunk_key_tag rvalue += "{tag}_" end @@ -138,7 +131,7 @@ def genereate_tempalte def suffix case @compress when nil - '' + "" when :gz ".gz" end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 2d56252563..5a5b51ec5c 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -7,7 +7,7 @@ require 'fluent/plugin/out_secondary_file' require 'fluent/plugin/buffer/memory_chunk' -class FileOutputTest < Test::Unit::TestCase +class FileOutputSecondaryTest < Test::Unit::TestCase def setup Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) @@ -28,53 +28,54 @@ def create_driver(conf = CONFIG) c.configure(conf) end - def test_configure - d = create_driver %[ + sub_test_case 'configture' do + test 'should b econfigurable' do + d = create_driver %[ path test_path compress gz ] - assert_equal 'test_path', d.instance.path - assert_equal :gz, d.instance.compress - end + assert_equal 'test_path', d.instance.path + assert_equal :gz, d.instance.compress + end - def test_asts_as_secondary - c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) - assert_raise do - c.configure(conf) + test 'should only use in secondary' do + c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + assert_raise do + c.configure(conf) + end end - end - def test_recieved_path - assert_raise(Fluent::ConfigError) do - create_driver %[ + test 'should receive a file path' do + assert_raise(Fluent::ConfigError) do + create_driver %[ compress gz ] + end end - end - def test_path_writable - assert_nothing_raised do - create_driver %[path #{TMP_DIR}/test_path] - end + test 'should be writable direcotry' do + assert_nothing_raised do + create_driver %[path #{TMP_DIR}/test_path] + end - assert_nothing_raised do - FileUtils.mkdir_p("#{TMP_DIR}/test_dir") - File.chmod(0777, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] - end + assert_nothing_raised do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0777, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end - assert_raise(Fluent::ConfigError) do - FileUtils.mkdir_p("#{TMP_DIR}/test_dir") - File.chmod(0555, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + assert_raise(Fluent::ConfigError) do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0555, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end end end - def check_gzipped_result(path, expect) # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790 # Following code from https://www.ruby-forum.com/topic/971591#979520 - result = '' + result = "" File.open(path, "rb") { |io| loop do gzr = Zlib::GzipReader.new(io) @@ -97,169 +98,73 @@ def create_metadata(timekey=nil, tag=nil, variables=nil) def create_es_chunk(metadata, es) DummyMemoryChunk.new(metadata).tap do |c| - c.concat(es.to_msgpack_stream, es.size) # standard_format + c.concat(es.to_msgpack_stream, es.size) # to_msgpack_stream is standard_format c.commit end end - RECORD_VALUE = { "key" => "vlaue"} - MESSAGE_PACKED_RECORD_VALUE = "\x92\xCEM z'\x81\xA3key\xA5vlaue".force_encoding('ASCII-8BIT') - - def test_write - d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - es = Fluent::OneEventStream.new(time, RECORD_VALUE) - c = create_es_chunk(create_metadata, es) - path = d.instance.write(c) - - assert_equal "#{TMP_DIR}/out_file_test.#{c.unique_id}_0.log.gz", path - check_gzipped_result(path, MESSAGE_PACKED_RECORD_VALUE) - end - - # TODO - # write temaplte test if needed - - # class TestWithSystem < self - # TMP_DIR_WITH_SYSTEM = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file_system#{ENV['TEST_ENV_NUMBER']}") - # # 0750 interprets as "488". "488".to_i(8) # => 4. So, it makes wrong permission. Umm.... - # OVERRIDE_DIR_PERMISSION = 750 - # OVERRIDE_FILE_PERMISSION = 0620 - # CONFIG_WITH_SYSTEM = %[ - # path #{TMP_DIR_WITH_SYSTEM}/out_file_test - # compress gz - # utc - # - # file_permission #{OVERRIDE_FILE_PERMISSION} - # dir_permission #{OVERRIDE_DIR_PERMISSION} - # - # ] - - # def setup - # omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? - # FileUtils.rm_rf(TMP_DIR_WITH_SYSTEM) - # end - - # def parse_system(text) - # basepath = File.expand_path(File.dirname(__FILE__) + '/../../') - # Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' } - # end - - # def test_write_with_system - # system_conf = parse_system(CONFIG_WITH_SYSTEM) - # sc = Fluent::SystemConfig.new(system_conf) - # Fluent::Engine.init(sc) - # d = create_driver CONFIG_WITH_SYSTEM - - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - - # # FileOutput#write returns path - # paths = d.run - # expect_paths = ["#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz"] - # assert_equal expect_paths, paths - - # check_gzipped_result(paths[0], %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) - # dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode - # assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i) - # file_mode = "%o" % File::stat(paths[0]).mode - # assert_equal(OVERRIDE_FILE_PERMISSION, file_mode[-3, 3].to_i) - # end - # end - - # def test_write_with_format_json - # d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n") - - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - - # # FileOutput#write returns path - # paths = d.run - # check_gzipped_result(paths[0], %[#{Yajl.dump({"a" => 1, 'time' => time})}\n] + %[#{Yajl.dump({"a" => 2, 'time' => time})}\n]) - # end - - # def test_write_with_format_ltsv - # d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n") - - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - - # # FileOutput#write returns path - # paths = d.run - # check_gzipped_result(paths[0], %[a:1\ttime:2011-01-02T13:14:15Z\n] + %[a:2\ttime:2011-01-02T13:14:15Z\n]) - # end - - # def test_write_with_format_single_value - # d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n") - - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - - # # FileOutput#write returns path - # paths = d.run - # check_gzipped_result(paths[0], %[1\n] + %[2\n]) - # end - - # def test_write_path_increment - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] - - # write_once = ->(){ - # d = create_driver - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - # d.run - # } + sub_test_case 'write' do + setup do + @record = { "key" => "vlaue"} + @time = Time.parse("2011-01-02 13:14:15 UTC").to_i + @es = Fluent::OneEventStream.new(@time, @record) + end - # assert !File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") + test 'should be output with standarad format' do + d = create_driver + c = create_es_chunk(create_metadata, @es) + path = d.instance.write(c) - # # FileOutput#write returns path - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102_0.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines) - # assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size + assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_0.log.gz", path + check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT')) + end - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102_1.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines) - # assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size + test 'should be output unzip file' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + ] + c = create_es_chunk(create_metadata, @es) + path = d.instance.write(c) - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102_2.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines) - # assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size - # end + assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_0.log", path + assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT') + end - # def test_write_with_append - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + test 'path should be increment without append' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress gz + # ] + c = create_es_chunk(create_metadata, @es) + packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') + + 5.times do |i| + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_#{i}.log.gz", path + check_gzipped_result(path, packed_value ) + end + end - # write_once = ->(){ - # d = create_driver %[ - # path #{TMP_DIR}/out_file_test - # compress gz - # utc - # append true - # ] - # d.emit({"a"=>1}, time) - # d.emit({"a"=>2}, time) - # d.run - # } + test 'path should be same with append' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress gz + append true + # ] + c = create_es_chunk(create_metadata, @es) + packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') + + [*1..5].each do |i| + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}.log.gz", path + check_gzipped_result(path, packed_value * i) + end + end + end - # # FileOutput#write returns path - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines) - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines * 2) - # paths = write_once.call - # assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - # check_gzipped_result(paths[0], formatted_lines * 3) - # end + sub_test_case 'path tempalte' do + # test that uses generate_temaplte + end # sub_test_case 'path' do # test 'normal' do From 146ce2ba2ca651c2b473aff6a54ae0832650c933 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 12 Aug 2016 17:24:52 +0900 Subject: [PATCH 05/26] Add conf that is `out_secondary_file` example --- example/secondary_file.conf | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 example/secondary_file.conf diff --git a/example/secondary_file.conf b/example/secondary_file.conf new file mode 100644 index 0000000000..bea6971e93 --- /dev/null +++ b/example/secondary_file.conf @@ -0,0 +1,40 @@ + + rpc_endpoint 0.0.0.0:24444 + + + + @type dummy + tag test + + + + @type forward + @label @raw + + + + + + @type forward + + + type memory + timekey 2s + timekey_wait 1s + flush_interval 1s + + + + host 0.0.0.0 + port 24224 + + + + type secondary_file + path ./log/secondary.${tag}_%Y%m%d%L_${message} + + From 5283a7784c0167edc5ffcf33b74bc1f49fe65497 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 12 Aug 2016 17:25:50 +0900 Subject: [PATCH 06/26] Support `extract_placeholder` in out_secondary_file's path --- lib/fluent/plugin/out_secondary_file.rb | 56 ++++++++++++++----------- lib/fluent/plugin/output.rb | 8 ++++ 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 2ad47f3ba4..d62a6e3109 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -54,13 +54,7 @@ def configure(conf) raise Fluent::ConfigError, "'path' parameter is required on secondary file output" end - if pos = @path.index('*') - @path_prefix = @path[0,pos] - @path_suffix = @path[pos+1..-1] - else - @path_prefix = @path + "." - @path_suffix = ".log" - end + configure_path! test_path = generate_path(Time.now.strftime("%Y%m%d")) unless Fluent::FileUtil.writable_p?(test_path) @@ -70,12 +64,11 @@ def configure(conf) @dir_perm = system_config.dir_permission || DIR_PERMISSION @file_perm = system_config.file_permission || FILE_PERMISSION - # @tempalte = genereate_tempalte fix super end def write(chunk) - id = extract_placeholders('', chunk) + id = extract_placeholders(@path, chunk) path = generate_path(id) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm @@ -107,25 +100,38 @@ def extract_placeholders(str, chunk) private - def genereate_tempalte - rvalue = "" - if @chunk_key_tag - rvalue += "{tag}_" - end - - if @chunk_key_time - rvalue += "%Y%m%d_" - end - - @chunk_keys.each do |v| - rvalue += "${#{v}}_" + def configure_path! + matched = @path.scan(/\${([\w\d.@-]+)}/).flat_map(&:itself) + p matched + if matched.empty? + if pos = @path.index('*') + @path_prefix = @path[0,pos] + @path_suffix = @path[pos+1..-1] + else + @path_prefix = @path + "." + @path_suffix = ".log" + end + else + validate_path_is_comptible_with_primary_buffer?(matched) + @path_prefix = "" + @path_suffix = ".log" end + end - if rvalue.end_with?("_") - rvalue = rvalue[0..-2] + def validate_path_is_comptible_with_primary_buffer?(matched) + raise "TimeFormat is not imcompatible with primary buffer's params" if !@chunk_key_time && @path != Time.now.strftime(@path) + matched.each do |e| + case + when @chunk_key_tag && e =~ /tag\w*/ + # nothing + when !@chunk_key_tag && e =~ /tag\w*/ + raise "#{e} is not imcompatible with primary buffer's params" + when @chunk_keys.include?(e) + # nothing + else + raise "#{e} is not imcompatible with primary buffer's params" + end end - - rvalue end def suffix diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index a577376a23..198f57bf58 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -191,6 +191,14 @@ def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary + @chunk_keys = @primary_instance.chunk_keys if @primary_instance.chunk_keys + @chunk_key_tag = @primary_instance.chunk_key_tag if @primary_instance.chunk_key_tag + if @primary_instance.chunk_key_time + @chunk_key_time = @primary_instance.chunk_key_time + @timekey_zone = Time.now.strftime('%z') # TO fix + @output_time_formatter_cache = {} + end + require_override = !self.class.instance_methods(false).include?(:extract_placeholders) (class << self; self; end).module_eval do if require_override From 52cab7da77922a5708ae5eee1ed9f1da30ff867e Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 12 Aug 2016 19:24:22 +0900 Subject: [PATCH 07/26] Add configure_path tests --- lib/fluent/plugin/out_secondary_file.rb | 19 ++- test/plugin/test_out_secondary_file.rb | 191 +++++++++++++++++------- 2 files changed, 144 insertions(+), 66 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index d62a6e3109..cbb02b7818 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -101,11 +101,10 @@ def extract_placeholders(str, chunk) private def configure_path! - matched = @path.scan(/\${([\w\d.@-]+)}/).flat_map(&:itself) - p matched - if matched.empty? + matched = @path.scan(/\${([\w.@-]+)}/).flat_map { |e| e } # for suporting 2.1 or less + if matched.empty? && !path_has_time_format? if pos = @path.index('*') - @path_prefix = @path[0,pos] + @path_prefix = @path[0, pos] @path_suffix = @path[pos+1..-1] else @path_prefix = @path + "." @@ -119,21 +118,25 @@ def configure_path! end def validate_path_is_comptible_with_primary_buffer?(matched) - raise "TimeFormat is not imcompatible with primary buffer's params" if !@chunk_key_time && @path != Time.now.strftime(@path) + raise "TimeFormat is not imcompatible with primary buffer's params" if !@chunk_key_time && path_has_time_format? matched.each do |e| case when @chunk_key_tag && e =~ /tag\w*/ - # nothing + # ok when !@chunk_key_tag && e =~ /tag\w*/ raise "#{e} is not imcompatible with primary buffer's params" - when @chunk_keys.include?(e) - # nothing + when @chunk_keys.include?(e.to_sym) + # ok else raise "#{e} is not imcompatible with primary buffer's params" end end end + def path_has_time_format? + @path != Time.now.strftime(@path) + end + def suffix case @compress when nil diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 5a5b51ec5c..7b1d855fae 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -21,8 +21,19 @@ def setup compress gz ] - def create_driver(conf = CONFIG) - primary = Class.new + class DummyOutput < Fluent::Plugin::Output + def register(name, &block) + instance_variable_set("@#{name}", block) + end + + def register_value(name, value) + instance_variable_set("@#{name}", value) + end + + def write(chunk); end + end + + def create_driver(conf = CONFIG, primary = DummyOutput.new) c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(primary) c.configure(conf) @@ -105,7 +116,7 @@ def create_es_chunk(metadata, es) sub_test_case 'write' do setup do - @record = { "key" => "vlaue"} + @record = { key: "vlaue" } @time = Time.parse("2011-01-02 13:14:15 UTC").to_i @es = Fluent::OneEventStream.new(@time, @record) end @@ -160,62 +171,126 @@ def create_es_chunk(metadata, es) check_gzipped_result(path, packed_value * i) end end - end - sub_test_case 'path tempalte' do - # test that uses generate_temaplte + # mtched data's test end - # sub_test_case 'path' do - # test 'normal' do - # d = create_driver(%[ - # path #{TMP_DIR}/out_file_test - # time_slice_format %Y-%m-%d-%H - # utc true - # ]) - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # # FileOutput#write returns path - # paths = d.run - # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.log"], paths - # end - - # test 'normal with append' do - # d = create_driver(%[ - # path #{TMP_DIR}/out_file_test - # time_slice_format %Y-%m-%d-%H - # utc true - # append true - # ]) - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # paths = d.run - # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.log"], paths - # end - - # test '*' do - # d = create_driver(%[ - # path #{TMP_DIR}/out_file_test.*.txt - # time_slice_format %Y-%m-%d-%H - # utc true - # ]) - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # paths = d.run - # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt"], paths - # end - - # test '* with append' do - # d = create_driver(%[ - # path #{TMP_DIR}/out_file_test.*.txt - # time_slice_format %Y-%m-%d-%H - # utc true - # append true - # ]) - # time = Time.parse("2011-01-02 13:14:15 UTC").to_i - # d.emit({"a"=>1}, time) - # paths = d.run - # assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.txt"], paths - # end - # end + sub_test_case 'path' do + setup do + @record = { key: "vlaue" } + @time = Time.parse("2011-01-02 13:14:15 UTC").to_i + @es = Fluent::OneEventStream.new(@time, @record) + @c = create_es_chunk(create_metadata, @es) + end + + test 'normal path' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress gz + ] + path = d.instance.write(@c) + assert_equal "#{TMP_DIR}/out_file_test.#{@c.chunk_id}_0.log.gz", path + end + + test 'path includes *' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test/*.log + compress gz + ] + + path = d.instance.write(@c) + assert_equal "#{TMP_DIR}/out_file_test/#{@c.chunk_id}_0.log.gz", path + end + + data( + invalid_tag: "#{TMP_DIR}/${tag}", + invalid_tag0: "#{TMP_DIR}/${tag0}", + invalid_variable: "#{TMP_DIR}/${dummy}", + invalid_timeformat: "#{TMP_DIR}/%Y%m%d", + ) + test 'path includes impcompatible placeholder' do |invalid_path| + c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + c.instance.acts_as_secondary(DummyOutput.new) + + assert_raise do + # mock + c.configure(%[ + path #{invalid_path} + compress gz + ]) + end + end + + test 'path includes tag' do + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_key_tag', true) + e.register_value('chunk_keys', []) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_${tag} + compress gz + ], primary) + + c = create_es_chunk(create_metadata(nil, "test.dummy"), @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_test.dummy_0.log.gz", path + end + # 複数タグ + + test 'path includes time format' do + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_key_time', true) + e.register_value('chunk_keys', []) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_%Y%m%d + compress gz + ], primary) + + c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_20110102_0.log.gz", path + end + + test 'path includes variable' do + pairs = { "test1".to_sym => "dummy" } + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_keys', pairs.keys) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_${test1} + compress gz + ], primary) + + c = create_es_chunk(create_metadata(nil, nil, pairs), @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_dummy_0.log.gz", path + end + + test 'path include tag, time format, variables' do + pairs = { "test1".to_sym => "dummy" } + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_keys', pairs.keys) + e.register_value('chunk_key_time', true) + e.register_value('chunk_key_tag', true) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_%Y%m%d_${tag}_${test1} + compress gz + ], primary) + + metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', pairs) + c = create_es_chunk(metadata, @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_20110102_test.tag_dummy_0.log.gz", path + end + end end From 0399cc03299352d96bf89d3e7f513ae7b29adfa3 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 12 Aug 2016 19:47:02 +0900 Subject: [PATCH 08/26] Fix a bug when path has tag[\d+] --- lib/fluent/plugin/out_secondary_file.rb | 6 +++--- test/plugin/test_out_secondary_file.rb | 21 +++++++++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index cbb02b7818..100dd1fb7a 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -101,7 +101,7 @@ def extract_placeholders(str, chunk) private def configure_path! - matched = @path.scan(/\${([\w.@-]+)}/).flat_map { |e| e } # for suporting 2.1 or less + matched = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] if matched.empty? && !path_has_time_format? if pos = @path.index('*') @path_prefix = @path[0, pos] @@ -121,9 +121,9 @@ def validate_path_is_comptible_with_primary_buffer?(matched) raise "TimeFormat is not imcompatible with primary buffer's params" if !@chunk_key_time && path_has_time_format? matched.each do |e| case - when @chunk_key_tag && e =~ /tag\w*/ + when @chunk_key_tag && e =~ /tag(\[\d+\])?/ # ok - when !@chunk_key_tag && e =~ /tag\w*/ + when !@chunk_key_tag && e =~ /tag(\[\d+\])?/ raise "#{e} is not imcompatible with primary buffer's params" when @chunk_keys.include?(e.to_sym) # ok diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 7b1d855fae..4df0d5d6c7 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -204,7 +204,7 @@ def create_es_chunk(metadata, es) data( invalid_tag: "#{TMP_DIR}/${tag}", - invalid_tag0: "#{TMP_DIR}/${tag0}", + invalid_tag0: "#{TMP_DIR}/${tag[0]}", invalid_variable: "#{TMP_DIR}/${dummy}", invalid_timeformat: "#{TMP_DIR}/%Y%m%d", ) @@ -237,7 +237,24 @@ def create_es_chunk(metadata, es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/out_file_test/cool_test.dummy_0.log.gz", path end - # 複数タグ + + test 'path includes /tag[\d+]/' do + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_key_tag', true) + e.register_value('chunk_keys', []) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_${tag[0]}_${tag[1]} + compress gz + ], primary) + + c = create_es_chunk(create_metadata(nil, "test.dummy"), @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_test_dummy_0.log.gz", path + end + test 'path includes time format' do primary = DummyOutput.new.tap do |e| From 3a5644a46a992c501c12dd8003243be0c3465635 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Sat, 13 Aug 2016 13:45:12 +0900 Subject: [PATCH 09/26] Update error messages and test them --- lib/fluent/plugin/out_secondary_file.rb | 8 ++++---- test/plugin/test_out_secondary_file.rb | 19 ++++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 100dd1fb7a..c615ea45fa 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -118,17 +118,17 @@ def configure_path! end def validate_path_is_comptible_with_primary_buffer?(matched) - raise "TimeFormat is not imcompatible with primary buffer's params" if !@chunk_key_time && path_has_time_format? + raise "BUG: file path has imcompatible placeholder: Time" if !@chunk_key_time && path_has_time_format? matched.each do |e| case when @chunk_key_tag && e =~ /tag(\[\d+\])?/ # ok when !@chunk_key_tag && e =~ /tag(\[\d+\])?/ - raise "#{e} is not imcompatible with primary buffer's params" - when @chunk_keys.include?(e.to_sym) + raise "BUG: file path has imcompatible placeholder: #{e}" + when @chunk_keys && @chunk_keys.include?(e.to_sym) # ok else - raise "#{e} is not imcompatible with primary buffer's params" + raise "BUG: file path has imcompatible placeholder: #{e}" end end end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 4df0d5d6c7..8ac87b4326 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -145,7 +145,7 @@ def create_es_chunk(metadata, es) d = create_driver %[ path #{TMP_DIR}/out_file_test compress gz - # ] + ] c = create_es_chunk(create_metadata, @es) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') @@ -161,7 +161,7 @@ def create_es_chunk(metadata, es) path #{TMP_DIR}/out_file_test compress gz append true - # ] + ] c = create_es_chunk(create_metadata, @es) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') @@ -171,8 +171,6 @@ def create_es_chunk(metadata, es) check_gzipped_result(path, packed_value * i) end end - - # mtched data's test end sub_test_case 'path' do @@ -203,17 +201,16 @@ def create_es_chunk(metadata, es) end data( - invalid_tag: "#{TMP_DIR}/${tag}", - invalid_tag0: "#{TMP_DIR}/${tag[0]}", - invalid_variable: "#{TMP_DIR}/${dummy}", - invalid_timeformat: "#{TMP_DIR}/%Y%m%d", + invalid_tag: ["tag", "#{TMP_DIR}/${tag}"], + invalid_tag0: ["tag[0]", "#{TMP_DIR}/${tag[0]}"], + invalid_variable: ["dummy", "#{TMP_DIR}/${dummy}"], + invalid_timeformat: ["Time", "#{TMP_DIR}/%Y%m%d"], ) - test 'path includes impcompatible placeholder' do |invalid_path| + test 'path includes impcompatible placeholder' do |(expected_message, invalid_path)| c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(DummyOutput.new) - assert_raise do - # mock + assert_raise_message("BUG: file path has imcompatible placeholder: #{expected_message}") do c.configure(%[ path #{invalid_path} compress gz From 8baf02720922e74fb6b095fc05bf5a450c7e4153 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Sun, 14 Aug 2016 19:38:39 +0900 Subject: [PATCH 10/26] Be able to set `timekey_zone` --- lib/fluent/plugin/output.rb | 9 ++++---- test/plugin/test_out_secondary_file.rb | 30 ++++++++++++++++++++------ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 198f57bf58..70e110029f 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -138,7 +138,7 @@ def expired? end end - attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout + attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count # for tests @@ -185,6 +185,7 @@ def initialize @simple_chunking = nil @chunk_keys = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil + @timekey_zone = nil end def acts_as_secondary(primary) @@ -195,7 +196,7 @@ def acts_as_secondary(primary) @chunk_key_tag = @primary_instance.chunk_key_tag if @primary_instance.chunk_key_tag if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time - @timekey_zone = Time.now.strftime('%z') # TO fix + @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end @@ -263,7 +264,7 @@ def configure(conf) if @chunk_key_time raise Fluent::ConfigError, " argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) - @buffer_config.timekey_zone = '+0000' if @buffer_config.timekey_use_utc + @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @output_time_formatter_cache = {} end @@ -490,7 +491,7 @@ def extract_placeholders(str, metadata) rvalue = str # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' - @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@buffer_config.timekey_zone, str) + @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(metadata.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 8ac87b4326..eb53e865a1 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -252,7 +252,6 @@ def create_es_chunk(metadata, es) assert_equal "#{TMP_DIR}/out_file_test/cool_test_dummy_0.log.gz", path end - test 'path includes time format' do primary = DummyOutput.new.tap do |e| e.register_value('chunk_key_time', true) @@ -260,14 +259,31 @@ def create_es_chunk(metadata, es) end d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_%Y%m%d + path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H + compress gz + ], primary) + + c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 JST")), @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_0.log.gz", path + end + + test 'path includes time format and with `timekey_use_utc`' do + primary = DummyOutput.new.tap do |e| + e.register_value('chunk_key_time', true) + e.register_value('chunk_keys', []) + end + + d = create_driver(%[ + path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H compress gz ], primary) - c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) + c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 JST")), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_20110102_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test/cool_2011010204_0.log.gz", path end test 'path includes variable' do @@ -296,15 +312,15 @@ def create_es_chunk(metadata, es) end d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_%Y%m%d_${tag}_${test1} + path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H_${tag}_${test1} compress gz ], primary) - metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', pairs) + metadata = create_metadata(Time.parse("2011-01-02 13:14:15 JST"), 'test.tag', pairs) c = create_es_chunk(metadata, @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_20110102_test.tag_dummy_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_test.tag_dummy_0.log.gz", path end end end From a23c34d5f5cf0521058a413fc4d21b71039f3964 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Sun, 14 Aug 2016 19:39:34 +0900 Subject: [PATCH 11/26] Stop using `instance_variable_set` in test --- lib/fluent/plugin/out_secondary_file.rb | 2 +- test/plugin/test_out_secondary_file.rb | 52 +++++++------------------ 2 files changed, 16 insertions(+), 38 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index c615ea45fa..82ab87fb34 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -125,7 +125,7 @@ def validate_path_is_comptible_with_primary_buffer?(matched) # ok when !@chunk_key_tag && e =~ /tag(\[\d+\])?/ raise "BUG: file path has imcompatible placeholder: #{e}" - when @chunk_keys && @chunk_keys.include?(e.to_sym) + when @chunk_keys && @chunk_keys.include?(e) # ok else raise "BUG: file path has imcompatible placeholder: #{e}" diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index eb53e865a1..7ef2b3bfb3 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -22,18 +22,14 @@ def setup ] class DummyOutput < Fluent::Plugin::Output - def register(name, &block) - instance_variable_set("@#{name}", block) - end - - def register_value(name, value) - instance_variable_set("@#{name}", value) - end - def write(chunk); end end - def create_driver(conf = CONFIG, primary = DummyOutput.new) + def create_primary(buffer_cofig = config_element('buffer')) + DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig])) + end + + def create_driver(conf = CONFIG, primary = create_primary) c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(primary) c.configure(conf) @@ -219,10 +215,7 @@ def create_es_chunk(metadata, es) end test 'path includes tag' do - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_key_tag', true) - e.register_value('chunk_keys', []) - end + primary = create_primary(config_element('buffer', 'tag')) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_${tag} @@ -236,10 +229,7 @@ def create_es_chunk(metadata, es) end test 'path includes /tag[\d+]/' do - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_key_tag', true) - e.register_value('chunk_keys', []) - end + primary = create_primary(config_element('buffer', 'tag')) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_${tag[0]}_${tag[1]} @@ -253,10 +243,7 @@ def create_es_chunk(metadata, es) end test 'path includes time format' do - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_key_time', true) - e.register_value('chunk_keys', []) - end + primary = create_primary(config_element('buffer', 'time', { 'timekey' => 1 })) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H @@ -270,10 +257,9 @@ def create_es_chunk(metadata, es) end test 'path includes time format and with `timekey_use_utc`' do - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_key_time', true) - e.register_value('chunk_keys', []) - end + primary = create_primary( + config_element('buffer', 'time', { 'timekey' => 1, 'timekey_use_utc' => true }) + ) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H @@ -287,36 +273,28 @@ def create_es_chunk(metadata, es) end test 'path includes variable' do - pairs = { "test1".to_sym => "dummy" } - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_keys', pairs.keys) - end + primary = create_primary(config_element('buffer', 'test1')) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_${test1} compress gz ], primary) - c = create_es_chunk(create_metadata(nil, nil, pairs), @es) + c = create_es_chunk(create_metadata(nil, nil, { "test1".to_sym => "dummy" }), @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/out_file_test/cool_dummy_0.log.gz", path end test 'path include tag, time format, variables' do - pairs = { "test1".to_sym => "dummy" } - primary = DummyOutput.new.tap do |e| - e.register_value('chunk_keys', pairs.keys) - e.register_value('chunk_key_time', true) - e.register_value('chunk_key_tag', true) - end + primary = create_primary(config_element('buffer', 'time,tag,test1', { 'timekey' => 1 })) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H_${tag}_${test1} compress gz ], primary) - metadata = create_metadata(Time.parse("2011-01-02 13:14:15 JST"), 'test.tag', pairs) + metadata = create_metadata(Time.parse("2011-01-02 13:14:15 JST"), 'test.tag', { "test1".to_sym => "dummy" }) c = create_es_chunk(metadata, @es) path = d.instance.write(c) From 6bf16df0048c6eca666ab9ab895642d012ab12ca Mon Sep 17 00:00:00 2001 From: ganmacs Date: Sun, 14 Aug 2016 22:27:53 +0900 Subject: [PATCH 12/26] Add `keytime_zone` configuration to priamry_output To test time format correctly --- test/plugin/test_out_secondary_file.rb | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 7ef2b3bfb3..0840408ef4 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -243,22 +243,24 @@ def create_es_chunk(metadata, es) end test 'path includes time format' do - primary = create_primary(config_element('buffer', 'time', { 'timekey' => 1 })) + primary = create_primary( + config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1 }) + ) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H compress gz ], primary) - c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 JST")), @es) + c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test/cool_2011010222_0.log.gz", path end test 'path includes time format and with `timekey_use_utc`' do primary = create_primary( - config_element('buffer', 'time', { 'timekey' => 1, 'timekey_use_utc' => true }) + config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1, 'timekey_use_utc' => true }) ) d = create_driver(%[ @@ -266,10 +268,10 @@ def create_es_chunk(metadata, es) compress gz ], primary) - c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 JST")), @es) + c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_2011010204_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_0.log.gz", path end test 'path includes variable' do @@ -287,14 +289,16 @@ def create_es_chunk(metadata, es) end test 'path include tag, time format, variables' do - primary = create_primary(config_element('buffer', 'time,tag,test1', { 'timekey' => 1 })) + primary = create_primary( + config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 }) + ) d = create_driver(%[ path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H_${tag}_${test1} compress gz ], primary) - metadata = create_metadata(Time.parse("2011-01-02 13:14:15 JST"), 'test.tag', { "test1".to_sym => "dummy" }) + metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', { "test1".to_sym => "dummy" }) c = create_es_chunk(metadata, @es) path = d.instance.write(c) From 1dbcf72232dff98144a53db91a9710c43e7951e3 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 18 Aug 2016 19:01:46 +0900 Subject: [PATCH 13/26] Change method name nil? -> empty? --- lib/fluent/plugin/buffer.rb | 2 +- lib/fluent/plugin/output.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index b463870249..b41ec29ae0 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -56,7 +56,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD Metadata = Struct.new(:timekey, :tag, :variables) do - def nil? + def empty? timekey.nil? && tag.nil? && variables.nil? end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 70e110029f..1518b74be0 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -485,7 +485,7 @@ def implement?(feature) # TODO: optimize this code def extract_placeholders(str, metadata) - if metadata.nil? + if metadata.empty? str else rvalue = str From 235679464be90927f3ebb611c093b3dcd6aa916f Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 18 Aug 2016 19:02:23 +0900 Subject: [PATCH 14/26] Update out_secondary * Use `enum` to set `compress` * Remove unused code(`initialize`) * Change method name `extract_placeholders` -> `generate_path` * Fix some typos --- lib/fluent/plugin/out_secondary_file.rb | 65 ++++++++++--------------- lib/fluent/plugin/output.rb | 5 +- 2 files changed, 28 insertions(+), 42 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 82ab87fb34..eef66abd9e 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -23,11 +23,6 @@ module Fluent::Plugin class SecondaryFileOutput < Output Fluent::Plugin.register_output("secondary_file", self) - SUPPORTED_COMPRESS = { - "gz" => :gz, - "gzip" => :gz, - } - FILE_PERMISSION = 0644 DIR_PERMISSION = 0755 @@ -35,50 +30,38 @@ class SecondaryFileOutput < Output config_param :path, :string desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, default: false - config_param :compress, default: nil do |val| - SUPPORTED_COMPRESS[val].tap do |e| - raise ConfigError, "Unsupported compression algorithm '#{val}'" unless e - end - end + config_param :compress, :enum, list: [:normal, :gz, :gzip], default: :normal - def initialize + def configure(conf) super - end - def configure(conf) unless @as_secondary raise Fluent::ConfigError, "This plugin can only be used in the section" end - unless @path = conf["path"] - raise Fluent::ConfigError, "'path' parameter is required on secondary file output" - end - configure_path! test_path = generate_path(Time.now.strftime("%Y%m%d")) unless Fluent::FileUtil.writable_p?(test_path) - raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" + raise Fluent::ConfigError, "out_secondary_file: `#{test_path}` is not writable" end @dir_perm = system_config.dir_permission || DIR_PERMISSION @file_perm = system_config.file_permission || FILE_PERMISSION - - super end def write(chunk) - id = extract_placeholders(@path, chunk) + id = generate_id(chunk) path = generate_path(id) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress - when nil + when :normal File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) chunk.write_to(f) } - when :gz + when :gz, :gzip File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) gz = Zlib::GzipWriter.new(f) @@ -90,19 +73,25 @@ def write(chunk) path end - def extract_placeholders(str, chunk) - if chunk.metadata.nil? + private + + def generate_id(chunk) + if chunk.metadata.empty? chunk.chunk_id else - super(str, chunk.metadata) + extract_placeholders(@path, chunk.metadata) end end - private - def configure_path! + # Check @path includes ${tag} etc. matched = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] - if matched.empty? && !path_has_time_format? + + if path_has_tags(matched) || path_has_time_format? + validate_path_is_comptible_with_primary_buffer!(matched) + @path_prefix = "" + @path_suffix = ".log" + else if pos = @path.index('*') @path_prefix = @path[0, pos] @path_suffix = @path[pos+1..-1] @@ -110,38 +99,38 @@ def configure_path! @path_prefix = @path + "." @path_suffix = ".log" end - else - validate_path_is_comptible_with_primary_buffer?(matched) - @path_prefix = "" - @path_suffix = ".log" end end - def validate_path_is_comptible_with_primary_buffer?(matched) + def validate_path_is_comptible_with_primary_buffer!(matched) raise "BUG: file path has imcompatible placeholder: Time" if !@chunk_key_time && path_has_time_format? matched.each do |e| case when @chunk_key_tag && e =~ /tag(\[\d+\])?/ - # ok + # ok when !@chunk_key_tag && e =~ /tag(\[\d+\])?/ raise "BUG: file path has imcompatible placeholder: #{e}" when @chunk_keys && @chunk_keys.include?(e) - # ok + # ok else raise "BUG: file path has imcompatible placeholder: #{e}" end end end + def path_has_tags(matched) + !matched.empty? + end + def path_has_time_format? @path != Time.now.strftime(@path) end def suffix case @compress - when nil + when :normal "" - when :gz + when :gz, :gzip ".gz" end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 1518b74be0..acfba69da4 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -200,11 +200,8 @@ def acts_as_secondary(primary) @output_time_formatter_cache = {} end - require_override = !self.class.instance_methods(false).include?(:extract_placeholders) (class << self; self; end).module_eval do - if require_override - define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } - end + define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } end From 2fa33ad98cfb77bbfc84e246b5cfcba947b16a2f Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 18 Aug 2016 19:09:08 +0900 Subject: [PATCH 15/26] Use mixeined method instead of `Fluent::UniqueId.hex` --- lib/fluent/plugin/buffer/chunk.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 6d00339d62..3c3f5f9431 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -49,7 +49,7 @@ class Chunk def initialize(metadata) super() @unique_id = generate_unique_id - @chunk_id = Fluent::UniqueId.hex(@unique_id) + @chunk_id = dump_unique_id_hex(@unique_id) @metadata = metadata # state: unstaged/staged/queued/closed From db2d715e21e95e9226d4c16ae31856ab22a47173 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Thu, 18 Aug 2016 19:22:36 +0900 Subject: [PATCH 16/26] `@chunk_keys` and `@chunk_key_tag` must be initilized --- lib/fluent/plugin/output.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index acfba69da4..89956cc34b 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -191,9 +191,8 @@ def initialize def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary - - @chunk_keys = @primary_instance.chunk_keys if @primary_instance.chunk_keys - @chunk_key_tag = @primary_instance.chunk_key_tag if @primary_instance.chunk_key_tag + @chunk_keys = @primary_instance.chunk_keys || [] + @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone From a8d39ecb3e2ae8ad8c9f7ce6a6fb8b6c7f7905a7 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 19 Aug 2016 10:25:31 +0900 Subject: [PATCH 17/26] Fix typo of test descrption. --- test/plugin/test_out_secondary_file.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 0840408ef4..00c97504c2 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -36,7 +36,7 @@ def create_driver(conf = CONFIG, primary = create_primary) end sub_test_case 'configture' do - test 'should b econfigurable' do + test 'should be configurable' do d = create_driver %[ path test_path compress gz @@ -60,7 +60,7 @@ def create_driver(conf = CONFIG, primary = create_primary) end end - test 'should be writable direcotry' do + test 'should be the writable directory' do assert_nothing_raised do create_driver %[path #{TMP_DIR}/test_path] end @@ -117,7 +117,7 @@ def create_es_chunk(metadata, es) @es = Fluent::OneEventStream.new(@time, @record) end - test 'should be output with standarad format' do + test 'should output with the standard format' do d = create_driver c = create_es_chunk(create_metadata, @es) path = d.instance.write(c) @@ -137,7 +137,7 @@ def create_es_chunk(metadata, es) assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT') end - test 'path should be increment without append' do + test 'path should be incremental without `append`' do d = create_driver %[ path #{TMP_DIR}/out_file_test compress gz @@ -152,7 +152,7 @@ def create_es_chunk(metadata, es) end end - test 'path should be same with append' do + test 'path should be the same as `append`' do d = create_driver %[ path #{TMP_DIR}/out_file_test compress gz From ad89300eb36bff1b50bf7e33842040f43e8489de Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 22 Aug 2016 13:01:41 +0900 Subject: [PATCH 18/26] Remove `unique_id` from Buffer::Chunk --- lib/fluent/plugin/buffer/chunk.rb | 3 +-- lib/fluent/plugin/out_secondary_file.rb | 2 +- test/plugin/test_out_secondary_file.rb | 20 ++++++++++++++------ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 3c3f5f9431..157c3e809c 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -49,7 +49,6 @@ class Chunk def initialize(metadata) super() @unique_id = generate_unique_id - @chunk_id = dump_unique_id_hex(@unique_id) @metadata = metadata # state: unstaged/staged/queued/closed @@ -60,7 +59,7 @@ def initialize(metadata) @modified_at = Time.now end - attr_reader :unique_id, :chunk_id, :metadata, :created_at, :modified_at, :state + attr_reader :unique_id, :metadata, :created_at, :modified_at, :state # data is array of formatted record string def append(data) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index eef66abd9e..1b003818bc 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -77,7 +77,7 @@ def write(chunk) def generate_id(chunk) if chunk.metadata.empty? - chunk.chunk_id + dump_unique_id_hex(chunk.unique_id) else extract_placeholders(@path, chunk.metadata) end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 00c97504c2..b8790c778a 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -3,11 +3,14 @@ require 'fileutils' require 'fluent/test' require 'fluent/event' +require 'fluent/unique_id' require 'fluent/plugin/buffer' require 'fluent/plugin/out_secondary_file' require 'fluent/plugin/buffer/memory_chunk' class FileOutputSecondaryTest < Test::Unit::TestCase + include Fluent::UniqueId::Mixin + def setup Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) @@ -120,9 +123,10 @@ def create_es_chunk(metadata, es) test 'should output with the standard format' do d = create_driver c = create_es_chunk(create_metadata, @es) + chunk_id = dump_unique_id_hex(c.unique_id) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_0.log.gz", path check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT')) end @@ -131,9 +135,10 @@ def create_es_chunk(metadata, es) path #{TMP_DIR}/out_file_test ] c = create_es_chunk(create_metadata, @es) + chunk_id = dump_unique_id_hex(c.unique_id) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_0.log", path + assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_0.log", path assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT') end @@ -143,11 +148,12 @@ def create_es_chunk(metadata, es) compress gz ] c = create_es_chunk(create_metadata, @es) + chunk_id = dump_unique_id_hex(c.unique_id) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}_#{i}.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_#{i}.log.gz", path check_gzipped_result(path, packed_value ) end end @@ -159,11 +165,12 @@ def create_es_chunk(metadata, es) append true ] c = create_es_chunk(create_metadata, @es) + chunk_id = dump_unique_id_hex(c.unique_id) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{c.chunk_id}.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}.log.gz", path check_gzipped_result(path, packed_value * i) end end @@ -175,6 +182,7 @@ def create_es_chunk(metadata, es) @time = Time.parse("2011-01-02 13:14:15 UTC").to_i @es = Fluent::OneEventStream.new(@time, @record) @c = create_es_chunk(create_metadata, @es) + @chunk_id = dump_unique_id_hex(@c.unique_id) end test 'normal path' do @@ -183,7 +191,7 @@ def create_es_chunk(metadata, es) compress gz ] path = d.instance.write(@c) - assert_equal "#{TMP_DIR}/out_file_test.#{@c.chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.#{@chunk_id}_0.log.gz", path end test 'path includes *' do @@ -193,7 +201,7 @@ def create_es_chunk(metadata, es) ] path = d.instance.write(@c) - assert_equal "#{TMP_DIR}/out_file_test/#{@c.chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test/#{@chunk_id}_0.log.gz", path end data( From 7b376a8af16e20345facc10ba2990c2c1ac159b4 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 22 Aug 2016 13:04:50 +0900 Subject: [PATCH 19/26] Use :text as compassion options instead of :normal --- lib/fluent/plugin/out_secondary_file.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 1b003818bc..e8b1463596 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -30,7 +30,7 @@ class SecondaryFileOutput < Output config_param :path, :string desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, default: false - config_param :compress, :enum, list: [:normal, :gz, :gzip], default: :normal + config_param :compress, :enum, list: [:text, :gz, :gzip], default: :text def configure(conf) super @@ -56,7 +56,7 @@ def write(chunk) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress - when :normal + when :text File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) chunk.write_to(f) @@ -128,7 +128,7 @@ def path_has_time_format? def suffix case @compress - when :normal + when :text "" when :gz, :gzip ".gz" From 1b67e1420b819d47fdcf91392c0d30235e586106 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 22 Aug 2016 13:06:49 +0900 Subject: [PATCH 20/26] Use Fluent::Test::Driver::Output Because `Fluent::Test::OutputTestDriver` is v0.12 API. --- lib/fluent/test/base.rb | 2 +- test/plugin/test_out_secondary_file.rb | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index 3bb420191b..ac17cf4561 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -37,7 +37,7 @@ def initialize(klass, &block) else @instance = klass end - @instance.router = Engine.root_agent.event_router if @instance.respond_to?(:router=) + @instance.router = Engine.root_agent.event_router @instance.log = TestLogger.new Engine.root_agent.instance_variable_set(:@log, @instance.log) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index b8790c778a..05f79ff2ea 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -1,12 +1,12 @@ require_relative '../helper' require 'time' require 'fileutils' -require 'fluent/test' require 'fluent/event' require 'fluent/unique_id' require 'fluent/plugin/buffer' require 'fluent/plugin/out_secondary_file' require 'fluent/plugin/buffer/memory_chunk' +require 'fluent/test/driver/output' class FileOutputSecondaryTest < Test::Unit::TestCase include Fluent::UniqueId::Mixin @@ -33,7 +33,7 @@ def create_primary(buffer_cofig = config_element('buffer')) end def create_driver(conf = CONFIG, primary = create_primary) - c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(primary) c.configure(conf) end @@ -49,7 +49,7 @@ def create_driver(conf = CONFIG, primary = create_primary) end test 'should only use in secondary' do - c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) assert_raise do c.configure(conf) end @@ -211,7 +211,7 @@ def create_es_chunk(metadata, es) invalid_timeformat: ["Time", "#{TMP_DIR}/%Y%m%d"], ) test 'path includes impcompatible placeholder' do |(expected_message, invalid_path)| - c = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::SecondaryFileOutput) + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(DummyOutput.new) assert_raise_message("BUG: file path has imcompatible placeholder: #{expected_message}") do From 46be8fe8e1d8d3360430eb767e52bbeaf3f75ca9 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 22 Aug 2016 13:08:17 +0900 Subject: [PATCH 21/26] Simplify a validating logic. * Change some error messages --- lib/fluent/plugin/out_secondary_file.rb | 59 +++++++++++-------------- test/plugin/test_out_secondary_file.rb | 14 +++--- 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index e8b1463596..76415391c7 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -39,7 +39,22 @@ def configure(conf) raise Fluent::ConfigError, "This plugin can only be used in the section" end - configure_path! + @placeholders = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] + + validate_path_is_comptible_with_primary_buffer! + + if !@placeholders.empty? || path_has_time_format? + @path_prefix = "" + @path_suffix = ".log" + else + if pos = @path.index('*') + @path_prefix = @path[0, pos] + @path_suffix = @path[pos+1..-1] + else + @path_prefix = @path + "." + @path_suffix = ".log" + end + end test_path = generate_path(Time.now.strftime("%Y%m%d")) unless Fluent::FileUtil.writable_p?(test_path) @@ -83,44 +98,24 @@ def generate_id(chunk) end end - def configure_path! - # Check @path includes ${tag} etc. - matched = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] + def validate_path_is_comptible_with_primary_buffer! + if !@chunk_key_time && path_has_time_format? + raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder, add time formats, like `%Y%m%d`, to `path`" + end - if path_has_tags(matched) || path_has_time_format? - validate_path_is_comptible_with_primary_buffer!(matched) - @path_prefix = "" - @path_suffix = ".log" - else - if pos = @path.index('*') - @path_prefix = @path[0, pos] - @path_suffix = @path[pos+1..-1] - else - @path_prefix = @path + "." - @path_suffix = ".log" - end + if !@chunk_key_tag && (ph = @placeholders.find { |p| p.match(/tag(\[\d+\])?/) }) + raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder #{ph}, add tag placeholder, like `${tag}`, to `path`" end - end - def validate_path_is_comptible_with_primary_buffer!(matched) - raise "BUG: file path has imcompatible placeholder: Time" if !@chunk_key_time && path_has_time_format? - matched.each do |e| - case - when @chunk_key_tag && e =~ /tag(\[\d+\])?/ - # ok - when !@chunk_key_tag && e =~ /tag(\[\d+\])?/ - raise "BUG: file path has imcompatible placeholder: #{e}" - when @chunk_keys && @chunk_keys.include?(e) - # ok - else - raise "BUG: file path has imcompatible placeholder: #{e}" + if @chunk_keys.empty? + vars = @placeholders.reject { |p| p.match(/tag(\[\d+\])?/) } + + if ph = vars.find { |v| !@chunk_keys.include?(v) } + raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder #{ph}, add variable placeholder, like `${varname}`, to `path`" end end end - def path_has_tags(matched) - !matched.empty? - end def path_has_time_format? @path != Time.now.strftime(@path) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 05f79ff2ea..f443b6ebd2 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -56,7 +56,7 @@ def create_driver(conf = CONFIG, primary = create_primary) end test 'should receive a file path' do - assert_raise(Fluent::ConfigError) do + assert_raise Fluent::ConfigError do create_driver %[ compress gz ] @@ -74,7 +74,7 @@ def create_driver(conf = CONFIG, primary = create_primary) create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] end - assert_raise(Fluent::ConfigError) do + assert_raise Fluent::ConfigError do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0555, "#{TMP_DIR}/test_dir") create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] @@ -205,16 +205,16 @@ def create_es_chunk(metadata, es) end data( - invalid_tag: ["tag", "#{TMP_DIR}/${tag}"], - invalid_tag0: ["tag[0]", "#{TMP_DIR}/${tag[0]}"], - invalid_variable: ["dummy", "#{TMP_DIR}/${dummy}"], - invalid_timeformat: ["Time", "#{TMP_DIR}/%Y%m%d"], + invalid_tag: [/tag/, "#{TMP_DIR}/${tag}"], + invalid_tag0: [/tag\[0\]/, "#{TMP_DIR}/${tag[0]}"], + invalid_variable: [/dummy/, "#{TMP_DIR}/${dummy}"], + invalid_timeformat: [/time/, "#{TMP_DIR}/%Y%m%d"], ) test 'path includes impcompatible placeholder' do |(expected_message, invalid_path)| c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(DummyOutput.new) - assert_raise_message("BUG: file path has imcompatible placeholder: #{expected_message}") do + assert_raise_message(expected_message) do c.configure(%[ path #{invalid_path} compress gz From 846b4ac459674aac8533bbfaa0bef1a95d264f4a Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 22 Aug 2016 13:41:32 +0900 Subject: [PATCH 22/26] Use secondary_file's unique_id to generate id To append chunks in same file when `append` is true and `chunk.metadata` is empty --- lib/fluent/plugin/out_secondary_file.rb | 4 ++-- test/plugin/test_out_secondary_file.rb | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 76415391c7..f6245dd64d 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -40,6 +40,7 @@ def configure(conf) end @placeholders = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] + @unique_id = dump_unique_id_hex(generate_unique_id) validate_path_is_comptible_with_primary_buffer! @@ -92,7 +93,7 @@ def write(chunk) def generate_id(chunk) if chunk.metadata.empty? - dump_unique_id_hex(chunk.unique_id) + @append ? @unique_id : dump_unique_id_hex(chunk.unique_id) else extract_placeholders(@path, chunk.metadata) end @@ -116,7 +117,6 @@ def validate_path_is_comptible_with_primary_buffer! end end - def path_has_time_format? @path != Time.now.strftime(@path) end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index f443b6ebd2..071a6eb879 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -165,7 +165,7 @@ def create_es_chunk(metadata, es) append true ] c = create_es_chunk(create_metadata, @es) - chunk_id = dump_unique_id_hex(c.unique_id) + chunk_id = d.instance.instance_variable_get(:@unique_id) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| From 3c31c020c026feb7cefe4c5f790762ba606fb9d4 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Tue, 23 Aug 2016 10:36:19 +0900 Subject: [PATCH 23/26] Change method name * `generate_id` -> `generate_basename` * Typo: comptible -> compatible --- lib/fluent/plugin/out_secondary_file.rb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index f6245dd64d..6ed34f635f 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -42,7 +42,7 @@ def configure(conf) @placeholders = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] @unique_id = dump_unique_id_hex(generate_unique_id) - validate_path_is_comptible_with_primary_buffer! + validate_path_is_compatible_with_primary_buffer! if !@placeholders.empty? || path_has_time_format? @path_prefix = "" @@ -67,8 +67,8 @@ def configure(conf) end def write(chunk) - id = generate_id(chunk) - path = generate_path(id) + base = generate_basename(chunk) + path = generate_path(base) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress @@ -91,7 +91,7 @@ def write(chunk) private - def generate_id(chunk) + def generate_basename(chunk) if chunk.metadata.empty? @append ? @unique_id : dump_unique_id_hex(chunk.unique_id) else @@ -99,7 +99,7 @@ def generate_id(chunk) end end - def validate_path_is_comptible_with_primary_buffer! + def validate_path_is_compatible_with_primary_buffer! if !@chunk_key_time && path_has_time_format? raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder, add time formats, like `%Y%m%d`, to `path`" end @@ -130,13 +130,13 @@ def suffix end end - def generate_path(id) + def generate_path(base) if @append - "#{@path_prefix}#{id}#{@path_suffix}#{suffix}" + "#{@path_prefix}#{base}#{@path_suffix}#{suffix}" else i = 0 loop do - path = "#{@path_prefix}#{id}_#{i}#{@path_suffix}#{suffix}" + path = "#{@path_prefix}#{base}_#{i}#{@path_suffix}#{suffix}" return path unless File.exist?(path) i += 1; end From df0b7a9e4fdda832a531eb05a4b89f4fbd95ee85 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Tue, 23 Aug 2016 10:36:19 +0900 Subject: [PATCH 24/26] Update out_secondary_file * Add `directory` and `basename` config_pa rams(Remove `path` config_params) * Do not support a path that includes "*" * Add placeholder regex test * Do not use chunk_id as path name(pass default name) * Remove `gz` of compress type * Fix some typos --- example/secondary_file.conf | 3 +- lib/fluent/plugin/out_secondary_file.rb | 90 +++----- lib/fluent/plugin/output.rb | 1 - test/plugin/test_out_secondary_file.rb | 263 +++++++++++++++--------- 4 files changed, 206 insertions(+), 151 deletions(-) diff --git a/example/secondary_file.conf b/example/secondary_file.conf index bea6971e93..fe37d4c99f 100644 --- a/example/secondary_file.conf +++ b/example/secondary_file.conf @@ -35,6 +35,7 @@ type secondary_file - path ./log/secondary.${tag}_%Y%m%d%L_${message} + directory log/secondary/ + basename ${tag}_%Y%m%d%L_${message} diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 6ed34f635f..93e3124052 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -25,12 +25,15 @@ class SecondaryFileOutput < Output FILE_PERMISSION = 0644 DIR_PERMISSION = 0755 + PLACEHOLDER_REGEX = /\${(tag(\[\d+\])?|[\w.@-]+)}/ - desc "The Path of the file." - config_param :path, :string + desc "The directory path of the output file." + config_param :directory, :string + desc "The baseanme of the output file." + config_param :basename, :string, default: "dump.bin" desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, default: false - config_param :compress, :enum, list: [:text, :gz, :gzip], default: :text + config_param :compress, :enum, list: [:text, :gzip], default: :text def configure(conf) super @@ -39,27 +42,19 @@ def configure(conf) raise Fluent::ConfigError, "This plugin can only be used in the section" end - @placeholders = @path.scan(/\${([\w.@-]+(\[\d+\])?)}/).flat_map(&:first) # to trim suffix [\d+] - @unique_id = dump_unique_id_hex(generate_unique_id) + @path_without_suffix = File.join(@directory, @basename) + validate_compatible_with_primary_buffer!(@path_without_suffix) - validate_path_is_compatible_with_primary_buffer! + @suffix = case @compress + when :text + "" + when :gzip + ".gz" + end - if !@placeholders.empty? || path_has_time_format? - @path_prefix = "" - @path_suffix = ".log" - else - if pos = @path.index('*') - @path_prefix = @path[0, pos] - @path_suffix = @path[pos+1..-1] - else - @path_prefix = @path + "." - @path_suffix = ".log" - end - end - - test_path = generate_path(Time.now.strftime("%Y%m%d")) + test_path = generate_path(File.join(@directory, Time.now.strftime("%Y%m%d"))) unless Fluent::FileUtil.writable_p?(test_path) - raise Fluent::ConfigError, "out_secondary_file: `#{test_path}` is not writable" + raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable" end @dir_perm = system_config.dir_permission || DIR_PERMISSION @@ -67,8 +62,8 @@ def configure(conf) end def write(chunk) - base = generate_basename(chunk) - path = generate_path(base) + path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata) + path = generate_path(path_without_suffix) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress @@ -77,7 +72,7 @@ def write(chunk) f.flock(File::LOCK_EX) chunk.write_to(f) } - when :gz, :gzip + when :gzip File.open(path, "ab", @file_perm) {|f| f.flock(File::LOCK_EX) gz = Zlib::GzipWriter.new(f) @@ -91,54 +86,37 @@ def write(chunk) private - def generate_basename(chunk) - if chunk.metadata.empty? - @append ? @unique_id : dump_unique_id_hex(chunk.unique_id) - else - extract_placeholders(@path, chunk.metadata) - end - end + def validate_compatible_with_primary_buffer!(path_without_suffix) + placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+] - def validate_path_is_compatible_with_primary_buffer! - if !@chunk_key_time && path_has_time_format? - raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder, add time formats, like `%Y%m%d`, to `path`" + if !@chunk_key_time && has_time_format?(path_without_suffix) + raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory" end - if !@chunk_key_tag && (ph = @placeholders.find { |p| p.match(/tag(\[\d+\])?/) }) - raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder #{ph}, add tag placeholder, like `${tag}`, to `path`" + if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) }) + raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory" end - if @chunk_keys.empty? - vars = @placeholders.reject { |p| p.match(/tag(\[\d+\])?/) } + vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) } - if ph = vars.find { |v| !@chunk_keys.include?(v) } - raise Fluent::ConfigError, "out_secondary_file: File path has an incompatible placeholder #{ph}, add variable placeholder, like `${varname}`, to `path`" - end + if ph = vars.find { |v| !@chunk_keys.include?(v) } + raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory" end end - def path_has_time_format? - @path != Time.now.strftime(@path) - end - - def suffix - case @compress - when :text - "" - when :gz, :gzip - ".gz" - end + def has_time_format?(str) + str != Time.now.strftime(str) end - def generate_path(base) + def generate_path(path_without_suffix) if @append - "#{@path_prefix}#{base}#{@path_suffix}#{suffix}" + "#{path_without_suffix}#{@suffix}" else i = 0 loop do - path = "#{@path_prefix}#{base}_#{i}#{@path_suffix}#{suffix}" + path = "#{path_without_suffix}.#{i}#{@suffix}" return path unless File.exist?(path) - i += 1; + i += 1 end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 89956cc34b..5c9c6c1696 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -200,7 +200,6 @@ def acts_as_secondary(primary) end (class << self; self; end).module_eval do - define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) } define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) } end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 071a6eb879..1de165df47 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -17,11 +17,12 @@ def setup FileUtils.mkdir_p(TMP_DIR) end - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file#{ENV['TEST_ENV_NUMBER']}") + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_secondary_file#{ENV['TEST_ENV_NUMBER']}") CONFIG = %[ - path #{TMP_DIR}/out_file_test - compress gz + directory #{TMP_DIR} + basename out_file_test + compress gzip ] class DummyOutput < Fluent::Plugin::Output @@ -39,13 +40,18 @@ def create_driver(conf = CONFIG, primary = create_primary) end sub_test_case 'configture' do + test 'default configuration' do + d = create_driver %[directory #{TMP_DIR}] + assert_equal 'dump.bin', d.instance.basename + assert_equal TMP_DIR, d.instance.directory + assert_equal :text, d.instance.compress + end + test 'should be configurable' do - d = create_driver %[ - path test_path - compress gz - ] - assert_equal 'test_path', d.instance.path - assert_equal :gz, d.instance.compress + d = create_driver + assert_equal 'out_file_test', d.instance.basename + assert_equal TMP_DIR, d.instance.directory + assert_equal :gzip, d.instance.compress end test 'should only use in secondary' do @@ -55,29 +61,31 @@ def create_driver(conf = CONFIG, primary = create_primary) end end - test 'should receive a file path' do - assert_raise Fluent::ConfigError do - create_driver %[ - compress gz - ] - end - end - - test 'should be the writable directory' do + test 'directory should be writable' do assert_nothing_raised do - create_driver %[path #{TMP_DIR}/test_path] + create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] end assert_nothing_raised do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0777, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] end assert_raise Fluent::ConfigError do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0555, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] + end + end + + test 'should be passed directory' do + assert_raise Fluent::ConfigError do + create_driver %[] + end + + assert_nothing_raised do + create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] end end end @@ -120,197 +128,266 @@ def create_es_chunk(metadata, es) @es = Fluent::OneEventStream.new(@time, @record) end - test 'should output with the standard format' do + test 'should output compressed file with gzip option' do d = create_driver c = create_es_chunk(create_metadata, @es) - chunk_id = dump_unique_id_hex(c.unique_id) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.0.gz", path check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT')) end - test 'should be output unzip file' do + test 'should be output plain text without gzip option' do d = create_driver %[ - path #{TMP_DIR}/out_file_test + directory #{TMP_DIR}/ + basename out_file_test ] c = create_es_chunk(create_metadata, @es) - chunk_id = dump_unique_id_hex(c.unique_id) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_0.log", path + assert_equal "#{TMP_DIR}/out_file_test.0", path assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT') end - test 'path should be incremental without `append`' do - d = create_driver %[ - path #{TMP_DIR}/out_file_test - compress gz - ] + test 'path should be incremental without append option' do + d = create_driver c = create_es_chunk(create_metadata, @es) - chunk_id = dump_unique_id_hex(c.unique_id) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}_#{i}.log.gz", path - check_gzipped_result(path, packed_value ) + assert_equal "#{TMP_DIR}/out_file_test.#{i}.gz", path + check_gzipped_result(path, packed_value) end end - test 'path should be the same as `append`' do - d = create_driver %[ - path #{TMP_DIR}/out_file_test - compress gz - append true - ] + test 'path should be the same with append option' do + d = create_driver CONFIG + %[append true] c = create_es_chunk(create_metadata, @es) - chunk_id = d.instance.instance_variable_get(:@unique_id) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test.#{chunk_id}.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.gz", path check_gzipped_result(path, packed_value * i) end end end + sub_test_case 'placeholder regex' do + data( + tag: '${tag}', + tag_index: '${tag[0]}', + tag_index1: '${tag[10]}', + variable: '${key1}', + variable2: '${key@value}', + variable3: '${key_value}', + variable4: '${key.value}', + variable5: '${key-value}', + variable6: '${KEYVALUE}', + variable7: '${tags}' + ) + test 'matches with a valid placeholder' do |path| + assert Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path) + end + + data( + invalid_tag: 'tag', + invalid_tag2: '{tag}', + invalid_tag3: '${tag', + invalid_variable: '${key[0]}', + ) + test "doesn't match with an invalid placeholder" do |path| + assert !Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path) + end + end + sub_test_case 'path' do setup do - @record = { key: "vlaue" } - @time = Time.parse("2011-01-02 13:14:15 UTC").to_i + @record = { key: 'value' } + @time = Time.parse('2011-01-02 13:14:15 UTC').to_i @es = Fluent::OneEventStream.new(@time, @record) @c = create_es_chunk(create_metadata, @es) - @chunk_id = dump_unique_id_hex(@c.unique_id) end - test 'normal path' do - d = create_driver %[ - path #{TMP_DIR}/out_file_test - compress gz - ] + test 'normal path with gzip option' do + d = create_driver path = d.instance.write(@c) - assert_equal "#{TMP_DIR}/out_file_test.#{@chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.0.gz", path end - test 'path includes *' do + test 'normal path without gzip option' do d = create_driver %[ - path #{TMP_DIR}/out_file_test/*.log - compress gz + directory #{TMP_DIR} + basename out_file_test ] - path = d.instance.write(@c) - assert_equal "#{TMP_DIR}/out_file_test/#{@chunk_id}_0.log.gz", path + assert_equal "#{TMP_DIR}/out_file_test.0", path end data( - invalid_tag: [/tag/, "#{TMP_DIR}/${tag}"], - invalid_tag0: [/tag\[0\]/, "#{TMP_DIR}/${tag[0]}"], - invalid_variable: [/dummy/, "#{TMP_DIR}/${dummy}"], - invalid_timeformat: [/time/, "#{TMP_DIR}/%Y%m%d"], + invalid_tag: [/tag/, '${tag}'], + invalid_tag0: [/tag\[0\]/, '${tag[0]}'], + invalid_variable: [/dummy/, '${dummy}'], + invalid_timeformat: [/time/, '%Y%m%d'], ) - test 'path includes impcompatible placeholder' do |(expected_message, invalid_path)| + test 'raise an error when basename includes incompatible placeholder' do |(expected_message, invalid_basename)| c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) c.instance.acts_as_secondary(DummyOutput.new) assert_raise_message(expected_message) do - c.configure(%[ - path #{invalid_path} - compress gz - ]) + c.configure %[ + directory #{TMP_DIR}/ + basename #{invalid_basename} + compress gzip + ] end end - test 'path includes tag' do + data( + invalid_tag: [/tag/, '${tag}'], + invalid_tag0: [/tag\[0\]/, '${tag[0]}'], + invalid_variable: [/dummy/, '${dummy}'], + invalid_timeformat: [/time/, '%Y%m%d'], + ) + test 'raise an error when directory includes incompatible placeholder' do |(expected_message, invalid_directory)| + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) + c.instance.acts_as_secondary(DummyOutput.new) + + assert_raise_message(expected_message) do + c.configure %[ + directory #{invalid_directory}/ + compress gzip + ] + end + end + + test 'basename includes tag' do primary = create_primary(config_element('buffer', 'tag')) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_${tag} - compress gz + directory #{TMP_DIR}/ + basename cool_${tag} + compress gzip ], primary) - c = create_es_chunk(create_metadata(nil, "test.dummy"), @es) + c = create_es_chunk(create_metadata(nil, 'test.dummy'), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_test.dummy_0.log.gz", path + assert_equal "#{TMP_DIR}/cool_test.dummy.0.gz", path end - test 'path includes /tag[\d+]/' do + test 'basename includes /tag[\d+]/' do primary = create_primary(config_element('buffer', 'tag')) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_${tag[0]}_${tag[1]} - compress gz + directory #{TMP_DIR}/ + basename cool_${tag[0]}_${tag[1]} + compress gzip ], primary) - c = create_es_chunk(create_metadata(nil, "test.dummy"), @es) + c = create_es_chunk(create_metadata(nil, 'test.dummy'), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_test_dummy_0.log.gz", path + assert_equal "#{TMP_DIR}/cool_test_dummy.0.gz", path end - test 'path includes time format' do + test 'basename includes time format' do primary = create_primary( config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1 }) ) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H - compress gz + directory #{TMP_DIR}/ + basename cool_%Y%m%d%H + compress gzip ], primary) c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_2011010222_0.log.gz", path + assert_equal "#{TMP_DIR}/cool_2011010222.0.gz", path end - test 'path includes time format and with `timekey_use_utc`' do + test 'basename includes time format with timekey_use_utc option' do primary = create_primary( config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1, 'timekey_use_utc' => true }) ) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H - compress gz + directory #{TMP_DIR}/ + basename cool_%Y%m%d%H + compress gzip ], primary) c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_0.log.gz", path + assert_equal "#{TMP_DIR}/cool_2011010213.0.gz", path end - test 'path includes variable' do + test 'basename includes variable' do primary = create_primary(config_element('buffer', 'test1')) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_${test1} - compress gz + directory #{TMP_DIR}/ + basename cool_${test1} + compress gzip ], primary) c = create_es_chunk(create_metadata(nil, nil, { "test1".to_sym => "dummy" }), @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_dummy_0.log.gz", path + assert_equal "#{TMP_DIR}/cool_dummy.0.gz", path + end + + test 'basename includes unnecessary variable' do + primary = create_primary(config_element('buffer', 'test1')) + c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) + c.instance.acts_as_secondary(primary) + + assert_raise_message(/test2/) do + c.configure %[ + directory #{TMP_DIR}/ + basename ${test1}_${test2} + compress gzip + ] + end + end + + test 'basename include tag, time format, and variables' do + primary = create_primary( + config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 }) + ) + + d = create_driver(%[ + directory #{TMP_DIR}/ + basename cool_%Y%m%d%H_${tag}_${test1} + compress gzip + ], primary) + + metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', { "test1".to_sym => "dummy" }) + c = create_es_chunk(metadata, @es) + + path = d.instance.write(c) + assert_equal "#{TMP_DIR}/cool_2011010213_test.tag_dummy.0.gz", path end - test 'path include tag, time format, variables' do + test 'directory includes tag, time format, and variables' do primary = create_primary( config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 }) ) d = create_driver(%[ - path #{TMP_DIR}/out_file_test/cool_%Y%m%d%H_${tag}_${test1} - compress gz + directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1} + compress gzip ], primary) metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', { "test1".to_sym => "dummy" }) c = create_es_chunk(metadata, @es) path = d.instance.write(c) - assert_equal "#{TMP_DIR}/out_file_test/cool_2011010213_test.tag_dummy_0.log.gz", path + assert_equal "#{TMP_DIR}/2011010213/test.tag/dummy/dump.bin.0.gz", path end end end From cbc1f1f724d3060680af4387133680746f61344b Mon Sep 17 00:00:00 2001 From: ganmacs Date: Fri, 26 Aug 2016 16:11:59 +0900 Subject: [PATCH 25/26] Fix test's description and added test cases --- test/plugin/test_out_secondary_file.rb | 126 +++++++++++++++---------- 1 file changed, 78 insertions(+), 48 deletions(-) diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 1de165df47..aef712fe54 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -45,19 +45,26 @@ def create_driver(conf = CONFIG, primary = create_primary) assert_equal 'dump.bin', d.instance.basename assert_equal TMP_DIR, d.instance.directory assert_equal :text, d.instance.compress + assert_equal false, d.instance.append end test 'should be configurable' do - d = create_driver + d = create_driver %[ + directory #{TMP_DIR} + basename out_file_test + compress gzip + append true + ] assert_equal 'out_file_test', d.instance.basename assert_equal TMP_DIR, d.instance.directory assert_equal :gzip, d.instance.compress + assert_equal true, d.instance.append end test 'should only use in secondary' do c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput) - assert_raise do - c.configure(conf) + assert_raise Fluent::ConfigError.new("This plugin can only be used in the section") do + c.configure(CONFIG) end end @@ -72,7 +79,7 @@ def create_driver(conf = CONFIG, primary = create_primary) create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] end - assert_raise Fluent::ConfigError do + assert_raise Fluent::ConfigError.new("out_secondary_file: `#{TMP_DIR}/test_dir/foo/bar/` should be writable") do FileUtils.mkdir_p("#{TMP_DIR}/test_dir") File.chmod(0555, "#{TMP_DIR}/test_dir") create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/] @@ -108,14 +115,8 @@ def check_gzipped_result(path, expect) assert_equal expect, result end - class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk; end - - def create_metadata(timekey=nil, tag=nil, variables=nil) - Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) - end - - def create_es_chunk(metadata, es) - DummyMemoryChunk.new(metadata).tap do |c| + def create_chunk(primary, metadata, es) + primary.buffer.generate_chunk(metadata).tap do |c| c.concat(es.to_msgpack_stream, es.size) # to_msgpack_stream is standard_format c.commit end @@ -123,58 +124,58 @@ def create_es_chunk(metadata, es) sub_test_case 'write' do setup do - @record = { key: "vlaue" } - @time = Time.parse("2011-01-02 13:14:15 UTC").to_i + @record = { 'key' => 'value' } + @time = event_time @es = Fluent::OneEventStream.new(@time, @record) + @primary = create_primary + metadata = @primary.buffer.new_metadata + @chunk = create_chunk(@primary, metadata, @es) end - test 'should output compressed file with gzip option' do - d = create_driver - c = create_es_chunk(create_metadata, @es) - path = d.instance.write(c) + test 'should output compressed file when compress option is gzip' do + d = create_driver(CONFIG, @primary) + path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT')) end - test 'should be output plain text without gzip option' do - d = create_driver %[ + test 'should output plain text when compress option is default(text)' do + d = create_driver(%[ directory #{TMP_DIR}/ basename out_file_test - ] - c = create_es_chunk(create_metadata, @es) - path = d.instance.write(c) + ], @primary) + + path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.0", path assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT') end - test 'path should be incremental without append option' do - d = create_driver - c = create_es_chunk(create_metadata, @es) + test 'path should be incremental when append option is false' do + d = create_driver(CONFIG, @primary) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| - path = d.instance.write(c) + path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.#{i}.gz", path check_gzipped_result(path, packed_value) end end - test 'path should be the same with append option' do - d = create_driver CONFIG + %[append true] - c = create_es_chunk(create_metadata, @es) + test 'path should be unchanged when append option is true' do + d = create_driver(CONFIG + %[append true], @primary) packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| - path = d.instance.write(c) + path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.gz", path check_gzipped_result(path, packed_value * i) end end end - sub_test_case 'placeholder regex' do + sub_test_case 'Syntax of placeholders' do data( tag: '${tag}', tag_index: '${tag[0]}', @@ -185,7 +186,8 @@ def create_es_chunk(metadata, es) variable4: '${key.value}', variable5: '${key-value}', variable6: '${KEYVALUE}', - variable7: '${tags}' + variable7: '${tags}', + variable8: '${tag${key}', # matched ${key} ) test 'matches with a valid placeholder' do |path| assert Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path) @@ -195,7 +197,10 @@ def create_es_chunk(metadata, es) invalid_tag: 'tag', invalid_tag2: '{tag}', invalid_tag3: '${tag', + invalid_tag4: '${tag0]}', + invalid_tag5: '${tag[]]}', invalid_variable: '${key[0]}', + invalid_variable2: '${key{key2}}', ) test "doesn't match with an invalid placeholder" do |path| assert !Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path) @@ -204,19 +209,21 @@ def create_es_chunk(metadata, es) sub_test_case 'path' do setup do - @record = { key: 'value' } - @time = Time.parse('2011-01-02 13:14:15 UTC').to_i + @record = { 'key' => 'value' } + @time = event_time @es = Fluent::OneEventStream.new(@time, @record) - @c = create_es_chunk(create_metadata, @es) + primary = create_primary + m = primary.buffer.new_metadata + @c = create_chunk(primary, m, @es) end - test 'normal path with gzip option' do + test 'normal path when compress option is gzip' do d = create_driver path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path end - test 'normal path without gzip option' do + test 'normal path when compress option is default' do d = create_driver %[ directory #{TMP_DIR} basename out_file_test @@ -225,6 +232,15 @@ def create_es_chunk(metadata, es) assert_equal "#{TMP_DIR}/out_file_test.0", path end + test 'normal path when append option is true' do + d = create_driver %[ + directory #{TMP_DIR} + append true + ] + path = d.instance.write(@c) + assert_equal "#{TMP_DIR}/dump.bin", path + end + data( invalid_tag: [/tag/, '${tag}'], invalid_tag0: [/tag\[0\]/, '${tag[0]}'], @@ -271,7 +287,8 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - c = create_es_chunk(create_metadata(nil, 'test.dummy'), @es) + m = primary.buffer.new_metadata(tag: 'test.dummy') + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_test.dummy.0.gz", path @@ -286,7 +303,8 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - c = create_es_chunk(create_metadata(nil, 'test.dummy'), @es) + m = primary.buffer.new_metadata(tag: 'test.dummy') + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_test_dummy.0.gz", path @@ -303,7 +321,8 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) + m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_2011010222.0.gz", path @@ -320,7 +339,8 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - c = create_es_chunk(create_metadata(Time.parse("2011-01-02 13:14:15 UTC")), @es) + m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_2011010213.0.gz", path @@ -335,7 +355,8 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - c = create_es_chunk(create_metadata(nil, nil, { "test1".to_sym => "dummy" }), @es) + m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" }) + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_dummy.0.gz", path @@ -355,7 +376,7 @@ def create_es_chunk(metadata, es) end end - test 'basename include tag, time format, and variables' do + test 'basename includes tag, time format, and variables' do primary = create_primary( config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 }) ) @@ -366,8 +387,13 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', { "test1".to_sym => "dummy" }) - c = create_es_chunk(metadata, @es) + m = primary.buffer.new_metadata( + timekey: event_time("2011-01-02 13:14:15 UTC"), + tag: 'test.tag', + variables: { "test1".to_sym => "dummy" } + ) + + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/cool_2011010213_test.tag_dummy.0.gz", path @@ -383,8 +409,12 @@ def create_es_chunk(metadata, es) compress gzip ], primary) - metadata = create_metadata(Time.parse("2011-01-02 13:14:15 UTC"), 'test.tag', { "test1".to_sym => "dummy" }) - c = create_es_chunk(metadata, @es) + m = primary.buffer.new_metadata( + timekey: event_time("2011-01-02 13:14:15 UTC"), + tag: 'test.tag', + variables: { "test1".to_sym => "dummy" } + ) + c = create_chunk(primary, m, @es) path = d.instance.write(c) assert_equal "#{TMP_DIR}/2011010213/test.tag/dummy/dump.bin.0.gz", path From eec980342f408384948a47d9d9c79d228ef14ad7 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Mon, 29 Aug 2016 11:43:46 +0900 Subject: [PATCH 26/26] Add test basename include `/` or not --- lib/fluent/plugin/out_secondary_file.rb | 8 ++++++-- test/plugin/test_out_secondary_file.rb | 9 +++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 93e3124052..0c84949505 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -29,7 +29,7 @@ class SecondaryFileOutput < Output desc "The directory path of the output file." config_param :directory, :string - desc "The baseanme of the output file." + desc "The basename of the output file." config_param :basename, :string, default: "dump.bin" desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, default: false @@ -42,6 +42,10 @@ def configure(conf) raise Fluent::ConfigError, "This plugin can only be used in the section" end + if @basename.include?("/") + raise Fluent::ConfigError, "basename should not include `/`" + end + @path_without_suffix = File.join(@directory, @basename) validate_compatible_with_primary_buffer!(@path_without_suffix) @@ -52,7 +56,7 @@ def configure(conf) ".gz" end - test_path = generate_path(File.join(@directory, Time.now.strftime("%Y%m%d"))) + test_path = @path_without_suffix unless Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable" end diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index aef712fe54..c7cb873850 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -68,6 +68,15 @@ def create_driver(conf = CONFIG, primary = create_primary) end end + test 'basename should not include `/`' do + assert_raise Fluent::ConfigError.new("basename should not include `/`") do + create_driver %[ + directory #{TMP_DIR} + basename out/file + ] + end + end + test 'directory should be writable' do assert_nothing_raised do create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]