From 07dead8293bd9f5b61521ff9965fa9df67037270 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu <komamitsu@gmail.com> Date: Sun, 29 Nov 2015 00:24:08 +0900 Subject: [PATCH] Fix DelayedForwarder#run and add unit test --- lib/fluent/process.rb | 2 +- test/test_process.rb | 47 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 test/test_process.rb diff --git a/lib/fluent/process.rb b/lib/fluent/process.rb index d49a41973f..517a5e4b8f 100644 --- a/lib/fluent/process.rb +++ b/lib/fluent/process.rb @@ -254,7 +254,7 @@ def run pairs = [] @mutex.synchronize do @buffer.keys.each do |tag| - if msg = @buffer.delete(tag) + if ms = @buffer.delete(tag) pairs << [tag, ms] end end diff --git a/test/test_process.rb b/test/test_process.rb new file mode 100644 index 0000000000..ae846c3529 --- /dev/null +++ b/test/test_process.rb @@ -0,0 +1,47 @@ +require_relative 'helper' +require 'fluent/test' +require 'fluent/event' +require 'stringio' +require 'msgpack' + +module FluentProcessTest + class DelayedForwarderTest < Test::Unit::TestCase + include Fluent + + test 'run and emit' do + io = StringIO.new + fwd = Fluent::DetachProcessManager::DelayedForwarder.new(io, 0.001) + + num_events_per_tag = 5000 + num_tags = 20 + + now = Time.now.to_i + record = {'key' => 'value'} + (num_tags * num_events_per_tag).times do |i| + tag = "foo.bar#{i % num_tags}" + fwd.emit(tag, OneEventStream.new(now, record)) + end + sleep 0.5 + + io.pos = 0 + + tags = {} + MessagePack::Unpacker.new(io).each do |tag_and_msgpacks| + tag, ms = *tag_and_msgpacks + tags[tag] ||= '' + tags[tag] << ms + end + + assert_equal(num_tags, tags.size) + num_tags.times do |i| + tag = "foo.bar#{i % num_tags}" + ms = tags[tag] + count = 0 + MessagePack::Unpacker.new(StringIO.new(ms)).each do |x| + count += 1 + end + assert_equal(num_events_per_tag, count) + end + end + end +end