From d245454658d16170431d276fcd5849fb0d88ab2b Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 14 Aug 2019 17:26:31 +0900 Subject: [PATCH] Use Mutex instead of Monitor Signed-off-by: Yuta Iwama --- lib/fluent/compat/output.rb | 6 +++--- lib/fluent/plugin/bare_output.rb | 8 +++---- lib/fluent/plugin/multi_output.rb | 8 +++---- lib/fluent/plugin/output.rb | 36 +++++++++++++++---------------- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 10097edc75..20e7d4cc71 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -352,7 +352,7 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += size } + @counter_mutex.synchronize{ @emit_records += size } return [meta] end @@ -363,7 +363,7 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += size } + @counter_mutex.synchronize{ @emit_records += size } return [meta] end @@ -373,7 +373,7 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += size } + @counter_mutex.synchronize{ @emit_records += size } [meta] end diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index cef034e9e3..56b1034e61 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -40,7 +40,7 @@ def process(tag, es) def initialize super - @counters_monitor = Monitor.new + @counter_mutex = Mutex.new # TODO: well organized counters @num_errors = 0 @emit_count = 0 @@ -48,12 +48,12 @@ def initialize end def emit_sync(tag, es) - @counters_monitor.synchronize{ @emit_count += 1 } + @counter_mutex.synchronize{ @emit_count += 1 } begin process(tag, es) - @counters_monitor.synchronize{ @emit_records += es.size } + @counter_mutex.synchronize{ @emit_records += es.size } rescue - @counters_monitor.synchronize{ @num_errors += 1 } + @counter_mutex.synchronize{ @num_errors += 1 } raise end end diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 6c5784e86e..46d706c3df 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -44,7 +44,7 @@ def initialize @outputs = [] @outputs_statically_created = false - @counters_monitor = Monitor.new + @counter_mutex = Mutex.new # TODO: well organized counters @num_errors = 0 @emit_count = 0 @@ -143,12 +143,12 @@ def terminate end def emit_sync(tag, es) - @counters_monitor.synchronize{ @emit_count += 1 } + @counter_mutex.synchronize{ @emit_count += 1 } begin process(tag, es) - @counters_monitor.synchronize{ @emit_records += es.size } + @counter_mutex.synchronize{ @emit_records += es.size } rescue - @counters_monitor.synchronize{ @num_errors += 1 } + @counter_mutex.synchronize{ @num_errors += 1 } raise end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index be44e3990b..e427c78274 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -173,7 +173,7 @@ def expired? def initialize super - @counters_monitor = Monitor.new + @counter_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @@ -780,18 +780,18 @@ def emit_events(tag, es) end def emit_sync(tag, es) - @counters_monitor.synchronize{ @emit_count += 1 } + @counter_mutex.synchronize{ @emit_count += 1 } begin process(tag, es) - @counters_monitor.synchronize{ @emit_records += es.size } + @counter_mutex.synchronize{ @emit_records += es.size } rescue - @counters_monitor.synchronize{ @num_errors += 1 } + @counter_mutex.synchronize{ @num_errors += 1 } raise end end def emit_buffered(tag, es) - @counters_monitor.synchronize{ @emit_count += 1 } + @counter_mutex.synchronize{ @emit_count += 1 } begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued? @@ -799,7 +799,7 @@ def emit_buffered(tag, es) end rescue # TODO: separate number of errors into emit errors and write/flush errors - @counters_monitor.synchronize{ @num_errors += 1 } + @counter_mutex.synchronize{ @num_errors += 1 } raise end end @@ -956,7 +956,7 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += records } + @counter_mutex.synchronize{ @emit_records += records } true end @@ -973,7 +973,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += records } + @counter_mutex.synchronize{ @emit_records += records } true end @@ -998,7 +998,7 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end - @counters_monitor.synchronize{ @emit_records += records } + @counter_mutex.synchronize{ @emit_records += records } true end @@ -1036,7 +1036,7 @@ def rollback_write(chunk_id, update_retry: true) # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) - @counters_monitor.synchronize{ @rollback_count += 1 } + @counter_mutex.synchronize{ @rollback_count += 1 } if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) @@ -1052,7 +1052,7 @@ def try_rollback_write while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counters_monitor.synchronize{ @rollback_count += 1 } + @counter_mutex.synchronize{ @rollback_count += 1 } log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1067,7 +1067,7 @@ def try_rollback_all until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counters_monitor.synchronize{ @rollback_count += 1 } + @counter_mutex.synchronize{ @rollback_count += 1 } log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1110,7 +1110,7 @@ def try_flush if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) - @counters_monitor.synchronize{ @write_count += 1 } + @counter_mutex.synchronize{ @write_count += 1 } @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in of primary ( don't get ) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) @@ -1122,7 +1122,7 @@ def try_flush chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id - @counters_monitor.synchronize{ @write_count += 1 } + @counter_mutex.synchronize{ @write_count += 1 } log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) @@ -1178,7 +1178,7 @@ def try_flush end if @buffer.takeback_chunk(chunk.unique_id) - @counters_monitor.synchronize { @rollback_count += 1 } + @counter_mutex.synchronize { @rollback_count += 1 } end update_retry_state(chunk.unique_id, using_secondary, e) @@ -1209,9 +1209,9 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i - @counters_monitor.synchronize { @flush_time_count += elapsed_millsec } + @counter_mutex.synchronize { @flush_time_count += elapsed_millsec } if elapsed_time > @slow_flush_log_threshold - @counters_monitor.synchronize { @slow_flush_count += 1 } + @counter_mutex.synchronize { @slow_flush_count += 1 } log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end @@ -1219,7 +1219,7 @@ def check_slow_flush(start) def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do - @counters_monitor.synchronize{ @num_errors += 1 } + @counter_mutex.synchronize{ @num_errors += 1 } chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry