From d82c156d5059dc626ec3d85cf4a8c0aee1f09c70 Mon Sep 17 00:00:00 2001 From: abicky Date: Mon, 11 Jun 2018 03:38:53 +0900 Subject: [PATCH] Wait for all chunks being purged before deleting @queued_num items This commit resolves https://github.com/fluent/fluentd/issues/1999. Signed-off-by: abicky --- lib/fluent/plugin/buffer.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 9c8fbfda9e..58154a9d93 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -151,6 +151,7 @@ def initialize @queue = [] #=> Array (chunks) : already flushed (not written) @dequeued = {} #=> Hash (unique_id -> chunk): already written (not purged) @queued_num = {} # metadata => int (number of queued chunks) + @dequeued_num = {} # metadata => int (number of dequeued chunks) @stage_size = @queue_size = 0 @metadata_list = [] # keys of @stage @@ -462,6 +463,8 @@ def dequeue_chunk @dequeued[chunk.unique_id] = chunk @queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero + @dequeued_num[chunk.metadata] ||= 0 + @dequeued_num[chunk.metadata] += 1 log.trace "chunk dequeued", instance: self.object_id, metadata: chunk.metadata chunk end @@ -476,6 +479,7 @@ def takeback_chunk(chunk_id) @queue.unshift(chunk) log.trace "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.metadata @queued_num[chunk.metadata] += 1 # BUG if nil + @dequeued_num[chunk.metadata] -= 1 end true end @@ -497,9 +501,11 @@ def purge_chunk(chunk_id) log.error_backtrace end - if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) + @dequeued_num[chunk.metadata] -= 1 + if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero? @metadata_list.delete(metadata) @queued_num.delete(metadata) + @dequeued_num.delete(metadata) end log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata end