Skip to content

Commit

Permalink
Merge pull request #710 from tagomoris/safe-detach-process
Browse files Browse the repository at this point in the history
fix bug not to protect in-memory-buffer for multi thread emitting / run loop
  • Loading branch information
repeatedly committed Nov 25, 2015
2 parents 480d0b2 + 66e618e commit aab268a
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions lib/fluent/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,26 +232,36 @@ def initialize(w, interval)
@w = w
@interval = interval
@buffer = {}
@mutex = Mutex.new
Thread.new(&method(:run))
end

def emit(tag, es)
if ms = @buffer[tag]
ms << es.to_msgpack_stream
else
@buffer[tag] = es.to_msgpack_stream
stream = es.to_msgpack_stream
@mutex.synchronize do
if @buffer[tag]
@buffer[tag] << stream
else
@buffer[tag] = stream
end
end
end

def run
while true
sleep @interval
@buffer.keys.each {|tag|
if ms = @buffer.delete(tag)
[tag, ms].to_msgpack(@w)
#@w.write [tag, ms].to_msgpack

pairs = []
@mutex.synchronize do
@buffer.keys.each do |tag|
if msg = @buffer.delete(tag)
pairs << [tag, ms]
end
end
}
end
pairs.each do |pair|
pair.to_msgpack(@w)
end
end
rescue
$log.error "error on forwerder thread", :error=>$!.to_s
Expand Down

0 comments on commit aab268a

Please sign in to comment.