diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index 935db5dfd1..f218ac1776 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -34,6 +34,7 @@ format = 'json' message_key = 'message' time_as_integer = false +retry_limit = 5 op.on('-p', '--port PORT', "fluent tcp port (default: #{port})", Integer) {|i| port = i @@ -75,6 +76,10 @@ time_as_integer = true } +op.on('--retry-limit N', "Specify the number of retry limit (default: #{retry_limit})", Integer) {|n| + retry_limit = n +} + singleton_class.module_eval do define_method(:usage) do |msg| puts op.to_s @@ -107,6 +112,8 @@ class Writer include MonitorMixin + RetryLimitError = Class.new(StandardError) + class TimerThread def initialize(writer) @writer = writer @@ -130,7 +137,7 @@ def run end end - def initialize(tag, connector, time_as_integer: false) + def initialize(tag, connector, time_as_integer: false, retry_limit: 5) @tag = tag @connector = connector @socket = false @@ -142,7 +149,7 @@ def initialize(tag, connector, time_as_integer: false) @pending = [] @pending_limit = 1024 # TODO @retry_wait = 1 - @retry_limit = 5 # TODO + @retry_limit = retry_limit @time_as_integer = time_as_integer super() @@ -236,21 +243,24 @@ def get_socket end def try_connect - now = Time.now.to_i - - unless @error_history.empty? - # wait before re-connecting - wait = @retry_wait * (2 ** (@error_history.size-1)) - if now <= @socket_time + wait - return false + begin + now = Time.now.to_i + + unless @error_history.empty? + # wait before re-connecting + wait = 1 #@retry_wait * (2 ** (@error_history.size-1)) + if now <= @socket_time + wait + sleep(wait) + try_connect + end end - end - begin @socket = @connector.call @error_history.clear return true + rescue RetryLimitError => ex + raise ex rescue $stderr.puts "connect failed: #{$!}" @error_history << $! @@ -263,9 +273,10 @@ def try_connect } @pending.clear @error_history.clear + raise RetryLimitError, "exceed retry limit" + else + retry end - - return false end end @@ -285,7 +296,7 @@ def abort_message(time, record) } end -w = Writer.new(tag, connector, time_as_integer: time_as_integer) +w = Writer.new(tag, connector, time_as_integer: time_as_integer, retry_limit: retry_limit) w.start case format