Skip to content

Commit

Permalink
Fix DelayedForwarder#run and add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu committed Nov 28, 2015
1 parent 17576d7 commit 07dead8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/fluent/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def run
pairs = []
@mutex.synchronize do
@buffer.keys.each do |tag|
if msg = @buffer.delete(tag)
if ms = @buffer.delete(tag)

This comment has been minimized.

Copy link
@nahi

nahi Dec 21, 2015

Contributor

👍

pairs << [tag, ms]
end
end
Expand Down
47 changes: 47 additions & 0 deletions test/test_process.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require_relative 'helper'
require 'fluent/test'
require 'fluent/event'
require 'stringio'
require 'msgpack'

module FluentProcessTest
class DelayedForwarderTest < Test::Unit::TestCase
include Fluent

test 'run and emit' do
io = StringIO.new
fwd = Fluent::DetachProcessManager::DelayedForwarder.new(io, 0.001)

num_events_per_tag = 5000
num_tags = 20

now = Time.now.to_i
record = {'key' => 'value'}
(num_tags * num_events_per_tag).times do |i|
tag = "foo.bar#{i % num_tags}"
fwd.emit(tag, OneEventStream.new(now, record))
end
sleep 0.5

io.pos = 0

tags = {}
MessagePack::Unpacker.new(io).each do |tag_and_msgpacks|
tag, ms = *tag_and_msgpacks
tags[tag] ||= ''
tags[tag] << ms
end

assert_equal(num_tags, tags.size)
num_tags.times do |i|
tag = "foo.bar#{i % num_tags}"
ms = tags[tag]
count = 0
MessagePack::Unpacker.new(StringIO.new(ms)).each do |x|
count += 1
end
assert_equal(num_events_per_tag, count)
end
end
end
end

0 comments on commit 07dead8

Please sign in to comment.