Skip to content

Commit

Permalink
Merge pull request #1200 from fluent/merge-buffered-and-non-buffered-…
Browse files Browse the repository at this point in the history
…outputs

Merge buffered and non buffered outputs
  • Loading branch information
tagomoris authored Sep 2, 2016
2 parents 81b4330 + 1119944 commit 2d4f730
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 392 deletions.
3 changes: 3 additions & 0 deletions example/in_forward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@

<match test>
@type stdout
# <buffer>
# flush_interval 10s
# </buffer>
</match>
16 changes: 10 additions & 6 deletions example/out_buffered_null.conf → example/out_null.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
</source>

<match dummy.**>
@type buffered_null
try_flush_interval 60
flush_interval 60
buffer_chunk_limit 1k
buffer_queue_limit 2
@type null
<buffer>
flush_interval 60s
chunk_limit_size 1k
total_limit_size 4k
</buffer>
</match>

<label error_log>
<match **>
@type stdout # or buffered_stdout
@type stdout
# <buffer>
# flush_interval 1s
# </buffer>
</match>
</label>

Expand Down
59 changes: 0 additions & 59 deletions lib/fluent/plugin/out_buffered_null.rb

This file was deleted.

70 changes: 0 additions & 70 deletions lib/fluent/plugin/out_buffered_stdout.rb

This file was deleted.

37 changes: 37 additions & 0 deletions lib/fluent/plugin/out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,47 @@

module Fluent::Plugin
class NullOutput < Output
# This plugin is for tests of non-buffered/buffered plugins
Fluent::Plugin.register_output('null', self)

config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_limit_size, 10 * 1024
end

def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :feed_proc, :delayed

def initialize
super
@delayed = false
@feed_proc = nil
end

def process(tag, es)
# Do nothing
end

def write(chunk)
if @feed_proc
@feed_proc.call(chunk)
end
end

def try_write(chunk)
if @feed_proc
@feed_proc.call(chunk)
end
# not to commit chunks for testing
# commit_write(chunk.unique_id)
end
end
end
40 changes: 38 additions & 2 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,61 @@ class StdoutOutput < Output

DEFAULT_FORMAT_TYPE = 'json'

config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_limit_size, 10 * 1024
end

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :delayed

def initialize
super
@delayed = false
end

def configure(conf)
if conf['output_type'] && !conf['format']
conf['format'] = conf['output_type']
end
compat_parameters_convert(conf, :inject, :formatter)

super

@formatter = formatter_create(conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
end

def process(tag, es)
es.each {|time,record|
r = inject_values_to_record(tag, time, record)
$log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, r).chomp}\n"
$log.write(format(tag, time, record))
}
$log.flush
end

def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
"#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
end

def write(chunk)
chunk.write_to($log)
end

def try_write(chunk)
chunk.write_to($log)
commit_write(chunk.unique_id)
end
end
end
4 changes: 4 additions & 0 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def test_configure
"@id"=>"null",
"@type" => "null"
},
"buffer_queue_length" => 0,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
Expand Down Expand Up @@ -285,6 +287,8 @@ def get(uri, header = {})
"@id" => "null",
"@type" => "null"
},
"buffer_queue_length" => 0,
"buffer_total_queued_size" => 0,
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "null",
Expand Down
79 changes: 0 additions & 79 deletions test/plugin/test_out_buffered_null.rb

This file was deleted.

Loading

0 comments on commit 2d4f730

Please sign in to comment.