Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new plugin helper: CompatParameters #969

Merged
merged 2 commits into from
May 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 8 additions & 46 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
require 'fluent/compat/output_chain'
require 'fluent/timezone'

require 'fluent/plugin_helper/compat_parameters'

require 'time'

module Fluent
Expand Down Expand Up @@ -208,21 +210,7 @@ def support_in_v12_style?(feature)

config_param :flush_at_shutdown, :bool, default: true

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP

def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
Expand All @@ -233,6 +221,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end

Expand Down Expand Up @@ -344,21 +333,7 @@ def support_in_v12_style?(feature)

config_set_default :time_as_integer, true

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP

def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
Expand All @@ -369,6 +344,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end

Expand Down Expand Up @@ -471,22 +447,7 @@ def support_in_v12_style?(feature)
config_set_default :@type, 'file'
end

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
"time_slice_wait" => "timekey_wait",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP.merge(Fluent::PluginHelper::CompatParameters::TIME_SLICED_PARAMS)

def initialize
super
Expand All @@ -509,6 +470,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end
unless buf_params.has_key?("@type")
Expand Down
12 changes: 7 additions & 5 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,13 @@ def after_shutdown
@buffer.after_shutdown

@output_flush_threads_running = false
@output_flush_threads.each do |state|
state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself
end
@output_flush_threads.each do |state|
state.thread.join
if @output_flush_threads && !@output_flush_threads.empty?
@output_flush_threads.each do |state|
state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself
end
@output_flush_threads.each do |state|
state.thread.join
end
end
end
end
Expand Down
93 changes: 93 additions & 0 deletions lib/fluent/plugin_helper/compat_parameters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/config/element'

module Fluent
module PluginHelper
module CompatParameters
# This plugin helper is to bring old-fashioned buffer/other
# configuration parameters to v0.14 plugin API configurations.
# This helper is mainly to convert plugins from v0.12 API
# to v0.14 API safely, without breaking user deployment.

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}

TIME_SLICED_PARAMS = {
"time_slice_format" => nil,
"time_slice_wait" => "timekey_wait",
}

def compat_parameters_default_chunk_key
# '', 'time' or 'tag'
raise NotImplementedError, "return one of '', 'time' or 'tag'"
end

def configure(conf)
if conf.elements('buffer').empty? && PARAMS_MAP.keys.any?{|k| conf.has_key?(k) } || TIME_SLICED_PARAMS.keys.any?{|k| conf.has_key?(k) }
# TODO: warn obsolete parameters if these are deprecated
attr = {}
PARAMS_MAP.each do |compat, current|
next unless current
attr[current] = conf[compat] if conf.has_key?(compat)
end
TIME_SLICED_PARAMS.each do |compat, current|
next unless current
attr[current] = conf[compat] if conf.has_key?(compat)
end

chunk_key = nil

if conf.has_key?('time_slice_format')
chunk_key = 'time'
attr['timekey_range'] = case conf['time_slice_format']
when /\%S/ then 1
when /\%M/ then 60
when /\%H/ then 3600
when /\%d/ then 86400
else
raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
end
else
chunk_key = compat_parameters_default_chunk_key
if chunk_key == 'time'
attr['timekey_range'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d'
end
end

e = Fluent::Config::Element.new('buffer', chunk_key, attr, [])
conf.elements << e
end

super
end
end
end
end
175 changes: 175 additions & 0 deletions test/plugin_helper/test_compat_parameters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
require_relative '../helper'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin/base'

class CompatParameterTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
@i = nil
end

teardown do
if @i
@i.stop unless @i.stopped?
@i.before_shutdown unless @i.before_shutdown?
@i.shutdown unless @i.shutdown?
@i.after_shutdown unless @i.after_shutdown?
@i.close unless @i.closed?
@i.terminate unless @i.terminated?
end
end

class Dummy0 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
''
end
def write(chunk)
# dummy
end
end
class Dummy1 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
'time'
end
def write(chunk)
# dummy
end
end
class Dummy2 < Fluent::Plugin::Output
helpers :compat_parameters
# for test to assume default key time by 'time_slice_format'
def write(chunk)
# dummy
end
end
class Dummy3 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
'tag'
end
def write(chunk)
# dummy
end
end

sub_test_case 'plugins which does not have default chunk key' do
setup do
@p = Dummy0
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'num_threads' => 8,
'flush_interval' => '10s',
'buffer_chunk_limit' => '8m',
'buffer_queue_limit' => '1024',
'flush_at_shutdown' => 'yes',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'memory', @i.buffer_config[:@type]
assert_equal [], @i.buffer_config.chunk_keys
assert_equal 8, @i.buffer_config.flush_threads
assert_equal 10, @i.buffer_config.flush_interval
assert @i.buffer_config.flush_at_shutdown

assert_equal 8*1024*1024, @i.buffer.chunk_bytes_limit
assert_equal 1024, @i.buffer.queue_length_limit
end
end

sub_test_case 'plugins which has default chunk key: time' do
setup do
@p = Dummy1
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'disable_retry_limit' => 'yes',
'max_retry_wait' => '1h',
'buffer_queue_full_action' => 'block',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'file', @i.buffer_config[:@type]
assert_equal 24*60*60, @i.buffer_config.timekey_range
assert @i.buffer_config.retry_forever
assert_equal 60*60, @i.buffer_config.retry_max_interval
assert_equal :block, @i.buffer_config.overflow_action

assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys

assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end

sub_test_case 'plugins which does not have default chunk key' do
setup do
@p = Dummy2
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'time_slice_format' => '%Y%m%d%H',
'time_slice_wait' => '10',
'retry_limit' => '1024',
'buffer_queue_full_action' => 'drop_oldest_chunk',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'file', @i.buffer_config[:@type]
assert_equal 60*60, @i.buffer_config.timekey_range
assert_equal 10, @i.buffer_config.timekey_wait
assert_equal 1024, @i.buffer_config.retry_max_times
assert_equal :drop_oldest_chunk, @i.buffer_config.overflow_action

assert @i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys

assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end

sub_test_case 'plugins which has default chunk key: tag' do
setup do
@p = Dummy3
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'memory',
'num_threads' => '10',
'flush_interval' => '10s',
'try_flush_interval' => '0.1',
'queued_chunk_flush_interval' => '0.5',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'memory', @i.buffer_config[:@type]
assert_equal 10, @i.buffer_config.flush_threads
assert_equal 10, @i.buffer_config.flush_interval
assert_equal 0.1, @i.buffer_config.flush_thread_interval
assert_equal 0.5, @i.buffer_config.flush_burst_interval

assert !@i.chunk_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
end
end
end