Skip to content

Commit

Permalink
Merge pull request #847 from cosmo0920/add-buffer-plugins-for-tests
Browse files Browse the repository at this point in the history
Add buffered output plugins for tests
  • Loading branch information
tagomoris authored Jun 15, 2016
2 parents fee8d31 + 9584807 commit 15801a1
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 1 deletion.
32 changes: 32 additions & 0 deletions example/out_buffered_null.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# bundle exec bin/fluentd -c example/out_buffered_null.conf
# (+ --emit-error-log-interval 10)
<source>
@type dummy
tag dummy
rate 500000000
dummy [
{"message": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"},
{"message": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"},
{"message": "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"}
]
</source>

<match dummy.**>
@type buffered_null
try_flush_interval 60
flush_interval 60
buffer_chunk_limit 1k
buffer_queue_limit 2
</match>

<label error_log>
<match **>
@type stdout # or buffered_stdout
</match>
</label>

<match fluent.**>
@type relabel
@label error_log
</match>
6 changes: 5 additions & 1 deletion lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ def run
end

def emit(num)
num.times { router.emit(@tag, Fluent::Engine.now, generate()) }
begin
num.times { router.emit(@tag, Fluent::Engine.now, generate()) }
rescue => e
# ignore all errors not to stop emits by emit errors
end
end

def generate
Expand Down
60 changes: 60 additions & 0 deletions lib/fluent/plugin/out_buffered_stdout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferedStdoutOutput < Output
Fluent::Plugin.register_output('buffered_stdout', self)

desc 'Output format.(json,hash)'
config_param :output_type, default: 'json'
config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_limit_size, 10 * 1024
end

attr_accessor :delayed

def initialize
super
@delayed = false
end

def prefer_delayed_commit
@delayed
end

def configure(conf)
super
@formatter = Fluent::Plugin.new_formatter(@output_type, parent: self)
@formatter.configure(conf)
end

def write(chunk)
chunk.write_to($log)
end

def try_write(chunk)
chunk.write_to($log)
end

def format(tag, time, record)
"#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
end
end
end
108 changes: 108 additions & 0 deletions test/plugin/test_out_buffered_stdout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
require_relative '../helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_buffered_stdout'

class BufferedStdoutOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end

CONFIG = %[
]

def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedStdoutOutput).configure(conf)
end

test 'default configure' do
d = create_driver
assert_equal 'json', d.instance.output_type
assert_equal 10 * 1024, d.instance.buffer_config.chunk_limit_size
assert d.instance.buffer_config.flush_at_shutdown
assert_equal ['tag'], d.instance.buffer_config.chunk_keys
assert d.instance.chunk_key_tag
assert !d.instance.chunk_key_time
assert_equal [], d.instance.chunk_keys
end

test 'configure with output_type' do
d = create_driver(CONFIG + "\noutput_type json")
assert_equal 'json', d.instance.output_type

d = create_driver(CONFIG + "\noutput_type hash")
assert_equal 'hash', d.instance.output_type

assert_raise(Fluent::ConfigError) do
d = create_driver(CONFIG + "\noutput_type foo")
end
end

sub_test_case "emit json" do
data('oj' => 'oj', 'yajl' => 'yajl')
test '#write(synchronous)' do |data|
d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
time = event_time()

out = capture_log do
d.run(default_tag: 'test', flush: true) do
d.feed(time, {'test' => 'test'})
end
end
assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
end

data('oj' => 'oj', 'yajl' => 'yajl')
test '#try_write(asynchronous)' do |data|
d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}")
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out
end
end

sub_test_case 'emit hash' do
test '#write(synchronous)' do
d = create_driver(CONFIG + "\noutput_type hash")
time = event_time()

out = capture_log do
d.run(default_tag: 'test', flush: true) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
end

test '#try_write(asynchronous)' do
d = create_driver(CONFIG + "\noutput_type hash")
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out
end
end

# Capture the log output of the block given
def capture_log(&block)
tmp = $log
$log = StringIO.new
yield
return $log.string
ensure
$log = tmp
end
end

0 comments on commit 15801a1

Please sign in to comment.