Skip to content

Commit

Permalink
Merge pull request #1154 from ganmacs/create-new-out_file-plugins-onl…
Browse files Browse the repository at this point in the history
…y-for-secondary

Create new out file plugins only for secondary
  • Loading branch information
tagomoris authored Sep 1, 2016
2 parents c2ee949 + eec9803 commit 0dd02c8
Show file tree
Hide file tree
Showing 5 changed files with 619 additions and 6 deletions.
41 changes: 41 additions & 0 deletions example/secondary_file.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<system>
rpc_endpoint 0.0.0.0:24444
</system>

<source>
@type dummy
tag test
</source>

<source>
@type forward
@label @raw
</source>

<label @raw>
<match>
@type stdout
</match>
</label>

<match test>
@type forward

<buffer time,tag,message>
type memory
timekey 2s
timekey_wait 1s
flush_interval 1s
</buffer>

<server>
host 0.0.0.0
port 24224
</server>

<secondary>
type secondary_file
directory log/secondary/
basename ${tag}_%Y%m%d%L_${message}
</secondary>
</match>
6 changes: 5 additions & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

Metadata = Struct.new(:timekey, :tag, :variables)
Metadata = Struct.new(:timekey, :tag, :variables) do
def empty?
timekey.nil? && tag.nil? && variables.nil?
end
end

# for tests
attr_accessor :stage_size, :queue_size
Expand Down
128 changes: 128 additions & 0 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require "fileutils"
require "fluent/plugin/file_util"
require "fluent/plugin/output"
require "fluent/config/error"

module Fluent::Plugin
class SecondaryFileOutput < Output
Fluent::Plugin.register_output("secondary_file", self)

FILE_PERMISSION = 0644
DIR_PERMISSION = 0755
PLACEHOLDER_REGEX = /\${(tag(\[\d+\])?|[\w.@-]+)}/

desc "The directory path of the output file."
config_param :directory, :string
desc "The basename of the output file."
config_param :basename, :string, default: "dump.bin"
desc "The flushed chunk is appended to existence file or not."
config_param :append, :bool, default: false
config_param :compress, :enum, list: [:text, :gzip], default: :text

def configure(conf)
super

unless @as_secondary
raise Fluent::ConfigError, "This plugin can only be used in the <secondary> section"
end

if @basename.include?("/")
raise Fluent::ConfigError, "basename should not include `/`"
end

@path_without_suffix = File.join(@directory, @basename)
validate_compatible_with_primary_buffer!(@path_without_suffix)

@suffix = case @compress
when :text
""
when :gzip
".gz"
end

test_path = @path_without_suffix
unless Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable"
end

@dir_perm = system_config.dir_permission || DIR_PERMISSION
@file_perm = system_config.file_permission || FILE_PERMISSION
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end

path
end

private

def validate_compatible_with_primary_buffer!(path_without_suffix)
placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+]

if !@chunk_key_time && has_time_format?(path_without_suffix)
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory"
end

if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) })
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
end

vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) }

if ph = vars.find { |v| !@chunk_keys.include?(v) }
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
end
end

def has_time_format?(str)
str != Time.now.strftime(str)
end

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
i += 1
end
end
end
end
end
18 changes: 13 additions & 5 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def expired?
end
end

attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout
attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone
attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count

# for tests
Expand Down Expand Up @@ -185,13 +185,21 @@ def initialize
@simple_chunking = nil
@chunk_keys = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
@timekey_zone = nil
end

def acts_as_secondary(primary)
@as_secondary = true
@primary_instance = primary
@chunk_keys = @primary_instance.chunk_keys || []
@chunk_key_tag = @primary_instance.chunk_key_tag || false
if @primary_instance.chunk_key_time
@chunk_key_time = @primary_instance.chunk_key_time
@timekey_zone = @primary_instance.timekey_zone
@output_time_formatter_cache = {}
end

(class << self; self; end).module_eval do
define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) }
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) }
end
Expand Down Expand Up @@ -251,7 +259,7 @@ def configure(conf)
if @chunk_key_time
raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
@buffer_config.timekey_zone = '+0000' if @buffer_config.timekey_use_utc
@timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@output_time_formatter_cache = {}
end

Expand Down Expand Up @@ -477,13 +485,13 @@ def implement?(feature)

# TODO: optimize this code
def extract_placeholders(str, metadata)
if metadata.timekey.nil? && metadata.tag.nil? && metadata.variables.nil?
if metadata.empty?
str
else
rvalue = str
# strftime formatting
if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@buffer_config.timekey_zone, str)
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
end
# ${tag}, ${tag[0]}, ${tag[1]}, ...
Expand Down
Loading

0 comments on commit 0dd02c8

Please sign in to comment.