diff --git a/example/secondary_file.conf b/example/secondary_file.conf
new file mode 100644
index 0000000000..fe37d4c99f
--- /dev/null
+++ b/example/secondary_file.conf
@@ -0,0 +1,41 @@
+
+ rpc_endpoint 0.0.0.0:24444
+
+
+
+
+
+
+
+
+
+ @type forward
+
+
+ type memory
+ timekey 2s
+ timekey_wait 1s
+ flush_interval 1s
+
+
+
+ host 0.0.0.0
+ port 24224
+
+
+
+ type secondary_file
+ directory log/secondary/
+ basename ${tag}_%Y%m%d%L_${message}
+
+
diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index 1eff6de52f..b41ec29ae0 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -55,7 +55,11 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD
- Metadata = Struct.new(:timekey, :tag, :variables)
+ Metadata = Struct.new(:timekey, :tag, :variables) do
+ def empty?
+ timekey.nil? && tag.nil? && variables.nil?
+ end
+ end
# for tests
attr_accessor :stage_size, :queue_size
diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb
new file mode 100644
index 0000000000..0c84949505
--- /dev/null
+++ b/lib/fluent/plugin/out_secondary_file.rb
@@ -0,0 +1,128 @@
+#
+# Fluentd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require "fileutils"
+require "fluent/plugin/file_util"
+require "fluent/plugin/output"
+require "fluent/config/error"
+
+module Fluent::Plugin
+ class SecondaryFileOutput < Output
+ Fluent::Plugin.register_output("secondary_file", self)
+
+ FILE_PERMISSION = 0644
+ DIR_PERMISSION = 0755
+ PLACEHOLDER_REGEX = /\${(tag(\[\d+\])?|[\w.@-]+)}/
+
+ desc "The directory path of the output file."
+ config_param :directory, :string
+ desc "The basename of the output file."
+ config_param :basename, :string, default: "dump.bin"
+ desc "The flushed chunk is appended to existence file or not."
+ config_param :append, :bool, default: false
+ config_param :compress, :enum, list: [:text, :gzip], default: :text
+
+ def configure(conf)
+ super
+
+ unless @as_secondary
+ raise Fluent::ConfigError, "This plugin can only be used in the section"
+ end
+
+ if @basename.include?("/")
+ raise Fluent::ConfigError, "basename should not include `/`"
+ end
+
+ @path_without_suffix = File.join(@directory, @basename)
+ validate_compatible_with_primary_buffer!(@path_without_suffix)
+
+ @suffix = case @compress
+ when :text
+ ""
+ when :gzip
+ ".gz"
+ end
+
+ test_path = @path_without_suffix
+ unless Fluent::FileUtil.writable_p?(test_path)
+ raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable"
+ end
+
+ @dir_perm = system_config.dir_permission || DIR_PERMISSION
+ @file_perm = system_config.file_permission || FILE_PERMISSION
+ end
+
+ def write(chunk)
+ path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata)
+ path = generate_path(path_without_suffix)
+ FileUtils.mkdir_p File.dirname(path), mode: @dir_perm
+
+ case @compress
+ when :text
+ File.open(path, "ab", @file_perm) {|f|
+ f.flock(File::LOCK_EX)
+ chunk.write_to(f)
+ }
+ when :gzip
+ File.open(path, "ab", @file_perm) {|f|
+ f.flock(File::LOCK_EX)
+ gz = Zlib::GzipWriter.new(f)
+ chunk.write_to(gz)
+ gz.close
+ }
+ end
+
+ path
+ end
+
+ private
+
+ def validate_compatible_with_primary_buffer!(path_without_suffix)
+ placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+]
+
+ if !@chunk_key_time && has_time_format?(path_without_suffix)
+ raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory"
+ end
+
+ if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) })
+ raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
+ end
+
+ vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) }
+
+ if ph = vars.find { |v| !@chunk_keys.include?(v) }
+ raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
+ end
+ end
+
+ def has_time_format?(str)
+ str != Time.now.strftime(str)
+ end
+
+ def generate_path(path_without_suffix)
+ if @append
+ "#{path_without_suffix}#{@suffix}"
+ else
+ i = 0
+ loop do
+ path = "#{path_without_suffix}.#{i}#{@suffix}"
+ return path unless File.exist?(path)
+ i += 1
+ end
+ end
+ end
+ end
+end
diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb
index cbace7f684..7829460ffa 100644
--- a/lib/fluent/plugin/output.rb
+++ b/lib/fluent/plugin/output.rb
@@ -138,7 +138,7 @@ def expired?
end
end
- attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout
+ attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone
attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count
# for tests
@@ -185,13 +185,21 @@ def initialize
@simple_chunking = nil
@chunk_keys = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
+ @timekey_zone = nil
end
def acts_as_secondary(primary)
@as_secondary = true
@primary_instance = primary
+ @chunk_keys = @primary_instance.chunk_keys || []
+ @chunk_key_tag = @primary_instance.chunk_key_tag || false
+ if @primary_instance.chunk_key_time
+ @chunk_key_time = @primary_instance.chunk_key_time
+ @timekey_zone = @primary_instance.timekey_zone
+ @output_time_formatter_cache = {}
+ end
+
(class << self; self; end).module_eval do
- define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) }
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) }
end
@@ -251,7 +259,7 @@ def configure(conf)
if @chunk_key_time
raise Fluent::ConfigError, " argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
- @buffer_config.timekey_zone = '+0000' if @buffer_config.timekey_use_utc
+ @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@output_time_formatter_cache = {}
end
@@ -477,13 +485,13 @@ def implement?(feature)
# TODO: optimize this code
def extract_placeholders(str, metadata)
- if metadata.timekey.nil? && metadata.tag.nil? && metadata.variables.nil?
+ if metadata.empty?
str
else
rvalue = str
# strftime formatting
if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
- @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@buffer_config.timekey_zone, str)
+ @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
end
# ${tag}, ${tag[0]}, ${tag[1]}, ...
diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb
new file mode 100644
index 0000000000..c7cb873850
--- /dev/null
+++ b/test/plugin/test_out_secondary_file.rb
@@ -0,0 +1,432 @@
+require_relative '../helper'
+require 'time'
+require 'fileutils'
+require 'fluent/event'
+require 'fluent/unique_id'
+require 'fluent/plugin/buffer'
+require 'fluent/plugin/out_secondary_file'
+require 'fluent/plugin/buffer/memory_chunk'
+require 'fluent/test/driver/output'
+
+class FileOutputSecondaryTest < Test::Unit::TestCase
+ include Fluent::UniqueId::Mixin
+
+ def setup
+ Fluent::Test.setup
+ FileUtils.rm_rf(TMP_DIR)
+ FileUtils.mkdir_p(TMP_DIR)
+ end
+
+ TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_secondary_file#{ENV['TEST_ENV_NUMBER']}")
+
+ CONFIG = %[
+ directory #{TMP_DIR}
+ basename out_file_test
+ compress gzip
+ ]
+
+ class DummyOutput < Fluent::Plugin::Output
+ def write(chunk); end
+ end
+
+ def create_primary(buffer_cofig = config_element('buffer'))
+ DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig]))
+ end
+
+ def create_driver(conf = CONFIG, primary = create_primary)
+ c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
+ c.instance.acts_as_secondary(primary)
+ c.configure(conf)
+ end
+
+ sub_test_case 'configture' do
+ test 'default configuration' do
+ d = create_driver %[directory #{TMP_DIR}]
+ assert_equal 'dump.bin', d.instance.basename
+ assert_equal TMP_DIR, d.instance.directory
+ assert_equal :text, d.instance.compress
+ assert_equal false, d.instance.append
+ end
+
+ test 'should be configurable' do
+ d = create_driver %[
+ directory #{TMP_DIR}
+ basename out_file_test
+ compress gzip
+ append true
+ ]
+ assert_equal 'out_file_test', d.instance.basename
+ assert_equal TMP_DIR, d.instance.directory
+ assert_equal :gzip, d.instance.compress
+ assert_equal true, d.instance.append
+ end
+
+ test 'should only use in secondary' do
+ c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
+ assert_raise Fluent::ConfigError.new("This plugin can only be used in the section") do
+ c.configure(CONFIG)
+ end
+ end
+
+ test 'basename should not include `/`' do
+ assert_raise Fluent::ConfigError.new("basename should not include `/`") do
+ create_driver %[
+ directory #{TMP_DIR}
+ basename out/file
+ ]
+ end
+ end
+
+ test 'directory should be writable' do
+ assert_nothing_raised do
+ create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
+ end
+
+ assert_nothing_raised do
+ FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
+ File.chmod(0777, "#{TMP_DIR}/test_dir")
+ create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
+ end
+
+ assert_raise Fluent::ConfigError.new("out_secondary_file: `#{TMP_DIR}/test_dir/foo/bar/` should be writable") do
+ FileUtils.mkdir_p("#{TMP_DIR}/test_dir")
+ File.chmod(0555, "#{TMP_DIR}/test_dir")
+ create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
+ end
+ end
+
+ test 'should be passed directory' do
+ assert_raise Fluent::ConfigError do
+ create_driver %[]
+ end
+
+ assert_nothing_raised do
+ create_driver %[directory #{TMP_DIR}/test_dir/foo/bar/]
+ end
+ end
+ end
+
+ def check_gzipped_result(path, expect)
+ # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
+ # Following code from https://www.ruby-forum.com/topic/971591#979520
+ result = ""
+ File.open(path, "rb") { |io|
+ loop do
+ gzr = Zlib::GzipReader.new(io)
+ result << gzr.read
+ unused = gzr.unused
+ gzr.finish
+ break if unused.nil?
+ io.pos -= unused.length
+ end
+ }
+
+ assert_equal expect, result
+ end
+
+ def create_chunk(primary, metadata, es)
+ primary.buffer.generate_chunk(metadata).tap do |c|
+ c.concat(es.to_msgpack_stream, es.size) # to_msgpack_stream is standard_format
+ c.commit
+ end
+ end
+
+ sub_test_case 'write' do
+ setup do
+ @record = { 'key' => 'value' }
+ @time = event_time
+ @es = Fluent::OneEventStream.new(@time, @record)
+ @primary = create_primary
+ metadata = @primary.buffer.new_metadata
+ @chunk = create_chunk(@primary, metadata, @es)
+ end
+
+ test 'should output compressed file when compress option is gzip' do
+ d = create_driver(CONFIG, @primary)
+ path = d.instance.write(@chunk)
+
+ assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
+ check_gzipped_result(path, @es.to_msgpack_stream.force_encoding('ASCII-8BIT'))
+ end
+
+ test 'should output plain text when compress option is default(text)' do
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename out_file_test
+ ], @primary)
+
+ path = d.instance.write(@chunk)
+
+ assert_equal "#{TMP_DIR}/out_file_test.0", path
+ assert_equal File.read(path), @es.to_msgpack_stream.force_encoding('ASCII-8BIT')
+ end
+
+ test 'path should be incremental when append option is false' do
+ d = create_driver(CONFIG, @primary)
+ packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')
+
+ 5.times do |i|
+ path = d.instance.write(@chunk)
+ assert_equal "#{TMP_DIR}/out_file_test.#{i}.gz", path
+ check_gzipped_result(path, packed_value)
+ end
+ end
+
+ test 'path should be unchanged when append option is true' do
+ d = create_driver(CONFIG + %[append true], @primary)
+ packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT')
+
+ [*1..5].each do |i|
+ path = d.instance.write(@chunk)
+ assert_equal "#{TMP_DIR}/out_file_test.gz", path
+ check_gzipped_result(path, packed_value * i)
+ end
+ end
+ end
+
+ sub_test_case 'Syntax of placeholders' do
+ data(
+ tag: '${tag}',
+ tag_index: '${tag[0]}',
+ tag_index1: '${tag[10]}',
+ variable: '${key1}',
+ variable2: '${key@value}',
+ variable3: '${key_value}',
+ variable4: '${key.value}',
+ variable5: '${key-value}',
+ variable6: '${KEYVALUE}',
+ variable7: '${tags}',
+ variable8: '${tag${key}', # matched ${key}
+ )
+ test 'matches with a valid placeholder' do |path|
+ assert Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path)
+ end
+
+ data(
+ invalid_tag: 'tag',
+ invalid_tag2: '{tag}',
+ invalid_tag3: '${tag',
+ invalid_tag4: '${tag0]}',
+ invalid_tag5: '${tag[]]}',
+ invalid_variable: '${key[0]}',
+ invalid_variable2: '${key{key2}}',
+ )
+ test "doesn't match with an invalid placeholder" do |path|
+ assert !Fluent::Plugin::SecondaryFileOutput::PLACEHOLDER_REGEX.match(path)
+ end
+ end
+
+ sub_test_case 'path' do
+ setup do
+ @record = { 'key' => 'value' }
+ @time = event_time
+ @es = Fluent::OneEventStream.new(@time, @record)
+ primary = create_primary
+ m = primary.buffer.new_metadata
+ @c = create_chunk(primary, m, @es)
+ end
+
+ test 'normal path when compress option is gzip' do
+ d = create_driver
+ path = d.instance.write(@c)
+ assert_equal "#{TMP_DIR}/out_file_test.0.gz", path
+ end
+
+ test 'normal path when compress option is default' do
+ d = create_driver %[
+ directory #{TMP_DIR}
+ basename out_file_test
+ ]
+ path = d.instance.write(@c)
+ assert_equal "#{TMP_DIR}/out_file_test.0", path
+ end
+
+ test 'normal path when append option is true' do
+ d = create_driver %[
+ directory #{TMP_DIR}
+ append true
+ ]
+ path = d.instance.write(@c)
+ assert_equal "#{TMP_DIR}/dump.bin", path
+ end
+
+ data(
+ invalid_tag: [/tag/, '${tag}'],
+ invalid_tag0: [/tag\[0\]/, '${tag[0]}'],
+ invalid_variable: [/dummy/, '${dummy}'],
+ invalid_timeformat: [/time/, '%Y%m%d'],
+ )
+ test 'raise an error when basename includes incompatible placeholder' do |(expected_message, invalid_basename)|
+ c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
+ c.instance.acts_as_secondary(DummyOutput.new)
+
+ assert_raise_message(expected_message) do
+ c.configure %[
+ directory #{TMP_DIR}/
+ basename #{invalid_basename}
+ compress gzip
+ ]
+ end
+ end
+
+ data(
+ invalid_tag: [/tag/, '${tag}'],
+ invalid_tag0: [/tag\[0\]/, '${tag[0]}'],
+ invalid_variable: [/dummy/, '${dummy}'],
+ invalid_timeformat: [/time/, '%Y%m%d'],
+ )
+ test 'raise an error when directory includes incompatible placeholder' do |(expected_message, invalid_directory)|
+ c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
+ c.instance.acts_as_secondary(DummyOutput.new)
+
+ assert_raise_message(expected_message) do
+ c.configure %[
+ directory #{invalid_directory}/
+ compress gzip
+ ]
+ end
+ end
+
+ test 'basename includes tag' do
+ primary = create_primary(config_element('buffer', 'tag'))
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_${tag}
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(tag: 'test.dummy')
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_test.dummy.0.gz", path
+ end
+
+ test 'basename includes /tag[\d+]/' do
+ primary = create_primary(config_element('buffer', 'tag'))
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_${tag[0]}_${tag[1]}
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(tag: 'test.dummy')
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_test_dummy.0.gz", path
+ end
+
+ test 'basename includes time format' do
+ primary = create_primary(
+ config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1 })
+ )
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_%Y%m%d%H
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_2011010222.0.gz", path
+ end
+
+ test 'basename includes time format with timekey_use_utc option' do
+ primary = create_primary(
+ config_element('buffer', 'time', { 'timekey_zone' => '+0900', 'timekey' => 1, 'timekey_use_utc' => true })
+ )
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_%Y%m%d%H
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC"))
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_2011010213.0.gz", path
+ end
+
+ test 'basename includes variable' do
+ primary = create_primary(config_element('buffer', 'test1'))
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_${test1}
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" })
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_dummy.0.gz", path
+ end
+
+ test 'basename includes unnecessary variable' do
+ primary = create_primary(config_element('buffer', 'test1'))
+ c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
+ c.instance.acts_as_secondary(primary)
+
+ assert_raise_message(/test2/) do
+ c.configure %[
+ directory #{TMP_DIR}/
+ basename ${test1}_${test2}
+ compress gzip
+ ]
+ end
+ end
+
+ test 'basename includes tag, time format, and variables' do
+ primary = create_primary(
+ config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 })
+ )
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/
+ basename cool_%Y%m%d%H_${tag}_${test1}
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(
+ timekey: event_time("2011-01-02 13:14:15 UTC"),
+ tag: 'test.tag',
+ variables: { "test1".to_sym => "dummy" }
+ )
+
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/cool_2011010213_test.tag_dummy.0.gz", path
+ end
+
+ test 'directory includes tag, time format, and variables' do
+ primary = create_primary(
+ config_element('buffer', 'time,tag,test1', { 'timekey_zone' => '+0000', 'timekey' => 1 })
+ )
+
+ d = create_driver(%[
+ directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1}
+ compress gzip
+ ], primary)
+
+ m = primary.buffer.new_metadata(
+ timekey: event_time("2011-01-02 13:14:15 UTC"),
+ tag: 'test.tag',
+ variables: { "test1".to_sym => "dummy" }
+ )
+ c = create_chunk(primary, m, @es)
+
+ path = d.instance.write(c)
+ assert_equal "#{TMP_DIR}/2011010213/test.tag/dummy/dump.bin.0.gz", path
+ end
+ end
+end