diff --git a/lib/bunny/reader_loop.rb b/lib/bunny/reader_loop.rb index 04277934d..a8bb29a4e 100644 --- a/lib/bunny/reader_loop.rb +++ b/lib/bunny/reader_loop.rb @@ -9,17 +9,17 @@ module Bunny # @private class ReaderLoop - def initialize(transport, session, session_thread) - @transport = transport - @session = session - @session_thread = session_thread - @logger = @session.logger + def initialize(transport, session, session_error_handler) + @transport = transport + @session = session + @session_error_handler = session_error_handler + @logger = @session.logger - @mutex = Mutex.new + @mutex = Mutex.new - @stopping = false - @stopped = false - @network_is_down = false + @stopping = false + @stopped = false + @network_is_down = false end @@ -47,7 +47,7 @@ def run_loop @session.handle_network_failure(e) else log_exception(e) - @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end rescue ShutdownSignal => _ @mutex.synchronize { @stopping = true } @@ -58,7 +58,7 @@ def run_loop log_exception(e) @network_is_down = true - @session_thread.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e)) end rescue Errno::EBADF => _ebadf break if terminate? diff --git a/lib/bunny/session.rb b/lib/bunny/session.rb index 6363f059b..6c86347e9 100644 --- a/lib/bunny/session.rb +++ b/lib/bunny/session.rb @@ -124,6 +124,7 @@ class Session # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever # @option connection_string_or_opts [Integer] :reset_recovery_attempts_after_reconnection (true) Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object. # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)? + # @option connection_string_or_opts [Object] :session_error_handler (Thread.current) Object which responds to #raise that will act as a session error handler. Defaults to Thread.current, which will raise asynchronous exceptions in the thread that created the session. # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use @@ -213,9 +214,9 @@ def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) @address_index_mutex = @mutex_impl.new @channels = Hash.new - @recovery_completed = opts[:recovery_completed] + @recovery_completed = opts[:recovery_completed] - @origin_thread = Thread.current + @session_error_handler = opts.fetch(:session_error_handler, Thread.current) self.reset_continuations self.initialize_transport @@ -862,7 +863,7 @@ def clean_up_and_fail_on_connection_close!(method) clean_up_on_shutdown if threaded? - @origin_thread.raise(@last_connection_error) + @session_error_handler.raise(@last_connection_error) else raise @last_connection_error end @@ -1019,7 +1020,7 @@ def start_reader_loop # @private def reader_loop - @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current) + @reader_loop ||= ReaderLoop.new(@transport, self, @session_error_handler) end # @private @@ -1282,7 +1283,7 @@ def open_connection end if threaded? - @origin_thread.raise(e) + @session_error_handler.raise(e) else raise e end @@ -1328,7 +1329,7 @@ def initialize_transport @transport = Transport.new(self, host_from_address(address), port_from_address(address), - @opts.merge(:session_thread => @origin_thread) + @opts.merge(:session_error_handler => @session_error_handler) ) # Reset the cached progname for the logger only when no logger was provided diff --git a/lib/bunny/transport.rb b/lib/bunny/transport.rb index 4894fcf1d..bb30a5005 100644 --- a/lib/bunny/transport.rb +++ b/lib/bunny/transport.rb @@ -35,7 +35,7 @@ def read_timeout=(v) def initialize(session, host, port, opts) @session = session - @session_thread = opts[:session_thread] + @session_error_handler = opts[:session_error_handler] @host = host @port = port @opts = opts @@ -146,7 +146,7 @@ def write(data) if @session.automatically_recover? @session.handle_network_failure(e) else - @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end @@ -170,7 +170,7 @@ def write(data) if @session.automatically_recover? @session.handle_network_failure(e) else - @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end @@ -188,7 +188,7 @@ def write_without_timeout(data, raise_exceptions = false) if @session.automatically_recover? @session.handle_network_failure(e) else - @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end @@ -245,7 +245,7 @@ def read_fully(count) if @session.automatically_recover? raise else - @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) + @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end