Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new out file plugins only for secondary #1154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f112cdd
Add #nil? method to Metadata
ganmacs Aug 10, 2016
e8b318b
Create `out_seconary_file` plugins
ganmacs Aug 10, 2016
717047b
Fix failed test
ganmacs Aug 11, 2016
2325a80
update secondary_out_file
ganmacs Aug 11, 2016
146ce2b
Add conf that is `out_secondary_file` example
ganmacs Aug 12, 2016
5283a77
Support `extract_placeholder` in out_secondary_file's path
ganmacs Aug 12, 2016
52cab7d
Add configure_path tests
ganmacs Aug 12, 2016
0399cc0
Fix a bug when path has tag[\d+]
ganmacs Aug 12, 2016
3a5644a
Update error messages and test them
ganmacs Aug 13, 2016
8baf027
Be able to set `timekey_zone`
ganmacs Aug 14, 2016
a23c34d
Stop using `instance_variable_set` in test
ganmacs Aug 14, 2016
6bf16df
Add `keytime_zone` configuration to priamry_output
ganmacs Aug 14, 2016
1dbcf72
Change method name nil? -> empty?
ganmacs Aug 18, 2016
2356794
Update out_secondary
ganmacs Aug 18, 2016
2fa33ad
Use mixeined method instead of `Fluent::UniqueId.hex`
ganmacs Aug 18, 2016
db2d715
`@chunk_keys` and `@chunk_key_tag` must be initilized
ganmacs Aug 18, 2016
a8d39ec
Fix typo of test descrption.
ganmacs Aug 19, 2016
ad89300
Remove `unique_id` from Buffer::Chunk
ganmacs Aug 22, 2016
7b376a8
Use :text as compassion options instead of :normal
ganmacs Aug 22, 2016
1b67e14
Use Fluent::Test::Driver::Output
ganmacs Aug 22, 2016
46be8fe
Simplify a validating logic.
ganmacs Aug 22, 2016
846b4ac
Use secondary_file's unique_id to generate id
ganmacs Aug 22, 2016
3c31c02
Change method name
ganmacs Aug 23, 2016
df0b7a9
Update out_secondary_file
ganmacs Aug 23, 2016
cbc1f1f
Fix test's description and added test cases
ganmacs Aug 26, 2016
eec9803
Add test basename include `/` or not
ganmacs Aug 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions example/secondary_file.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<system>
rpc_endpoint 0.0.0.0:24444
</system>

<source>
@type dummy
tag test
</source>

<source>
@type forward
@label @raw
</source>

<label @raw>
<match>
@type stdout
</match>
</label>

<match test>
@type forward

<buffer time,tag,message>
type memory
timekey 2s
timekey_wait 1s
flush_interval 1s
</buffer>

<server>
host 0.0.0.0
port 24224
</server>

<secondary>
type secondary_file
directory log/secondary/
basename ${tag}_%Y%m%d%L_${message}
</secondary>
</match>
6 changes: 5 additions & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

Metadata = Struct.new(:timekey, :tag, :variables)
Metadata = Struct.new(:timekey, :tag, :variables) do
def empty?
timekey.nil? && tag.nil? && variables.nil?
end
end

# for tests
attr_accessor :stage_size, :queue_size
Expand Down
128 changes: 128 additions & 0 deletions lib/fluent/plugin/out_secondary_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require "fileutils"
require "fluent/plugin/file_util"
require "fluent/plugin/output"
require "fluent/config/error"

module Fluent::Plugin
class SecondaryFileOutput < Output
Fluent::Plugin.register_output("secondary_file", self)

FILE_PERMISSION = 0644
DIR_PERMISSION = 0755
PLACEHOLDER_REGEX = /\${(tag(\[\d+\])?|[\w.@-]+)}/

desc "The directory path of the output file."
config_param :directory, :string
desc "The basename of the output file."
config_param :basename, :string, default: "dump.bin"
desc "The flushed chunk is appended to existence file or not."
config_param :append, :bool, default: false
config_param :compress, :enum, list: [:text, :gzip], default: :text

def configure(conf)
super

unless @as_secondary
raise Fluent::ConfigError, "This plugin can only be used in the <secondary> section"
end

if @basename.include?("/")
raise Fluent::ConfigError, "basename should not include `/`"
end

@path_without_suffix = File.join(@directory, @basename)
validate_compatible_with_primary_buffer!(@path_without_suffix)

@suffix = case @compress
when :text
""
when :gzip
".gz"
end

test_path = @path_without_suffix
unless Fluent::FileUtil.writable_p?(test_path)
raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable"
end

@dir_perm = system_config.dir_permission || DIR_PERMISSION
@file_perm = system_config.file_permission || FILE_PERMISSION
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk.metadata)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
Copy link
Member

@tagomoris tagomoris Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to confirm: is this lock freed when f is closed?

Copy link
Member Author

@ganmacs ganmacs Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is.
http://docs.ruby-lang.org/ja/2.3.0/method/File/i/flock.html (Sorry, This is Japanse article. I can't find English one)
It describes that locked object will be freed by closing file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end

path
end

private

def validate_compatible_with_primary_buffer!(path_without_suffix)
placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+]

if !@chunk_key_time && has_time_format?(path_without_suffix)
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory"
end

if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) })
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
end

vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) }

if ph = vars.find { |v| !@chunk_keys.include?(v) }
raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this if block checking?
If @chunk_keys.empty? is true, !@chunk_keys.include?(v) is always true.
Check both of code and test code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end

def has_time_format?(str)
str != Time.now.strftime(str)
end

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
i += 1
end
end
end
end
end
18 changes: 13 additions & 5 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def expired?
end
end

attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout
attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone
attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count

# for tests
Expand Down Expand Up @@ -185,13 +185,21 @@ def initialize
@simple_chunking = nil
@chunk_keys = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
@timekey_zone = nil
end

def acts_as_secondary(primary)
@as_secondary = true
@primary_instance = primary
@chunk_keys = @primary_instance.chunk_keys || []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overwriting this method is to use @chunk_keys and some other instance variables of primary plugin instances.
But your update brings these instance variable and these values into secondary plugin instances too.
So I think we can just remove this L194.

@chunk_key_tag = @primary_instance.chunk_key_tag || false
if @primary_instance.chunk_key_time
@chunk_key_time = @primary_instance.chunk_key_time
@timekey_zone = @primary_instance.timekey_zone
@output_time_formatter_cache = {}
end

(class << self; self; end).module_eval do
define_method(:extract_placeholders){ |str, metadata| @primary_instance.extract_placeholders(str, metadata) }
define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) }
define_method(:rollback_write){ |chunk_id| @primary_instance.rollback_write(chunk_id) }
end
Expand Down Expand Up @@ -251,7 +259,7 @@ def configure(conf)
if @chunk_key_time
raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
@buffer_config.timekey_zone = '+0000' if @buffer_config.timekey_use_utc
@timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@output_time_formatter_cache = {}
end

Expand Down Expand Up @@ -472,13 +480,13 @@ def implement?(feature)

# TODO: optimize this code
def extract_placeholders(str, metadata)
if metadata.timekey.nil? && metadata.tag.nil? && metadata.variables.nil?
if metadata.empty?
str
else
rvalue = str
# strftime formatting
if @chunk_key_time # this section MUST be earlier than rest to use raw 'str'
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@buffer_config.timekey_zone, str)
@output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str)
rvalue = @output_time_formatter_cache[str].call(metadata.timekey)
end
# ${tag}, ${tag[0]}, ${tag[1]}, ...
Expand Down
Loading