Skip to content

Commit

Permalink
Merge pull request #598 from bbascarevic-tti/issue-597
Browse files Browse the repository at this point in the history
Fixes #597 - Session error handler object which responds to #raise
  • Loading branch information
michaelklishin authored Aug 13, 2020
2 parents 86c7f09 + 85bc00b commit c78dd67
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
22 changes: 11 additions & 11 deletions lib/bunny/reader_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ module Bunny
# @private
class ReaderLoop

def initialize(transport, session, session_thread)
@transport = transport
@session = session
@session_thread = session_thread
@logger = @session.logger
def initialize(transport, session, session_error_handler)
@transport = transport
@session = session
@session_error_handler = session_error_handler
@logger = @session.logger

@mutex = Mutex.new
@mutex = Mutex.new

@stopping = false
@stopped = false
@network_is_down = false
@stopping = false
@stopped = false
@network_is_down = false
end


Expand Down Expand Up @@ -47,7 +47,7 @@ def run_loop
@session.handle_network_failure(e)
else
log_exception(e)
@session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
rescue ShutdownSignal => _
@mutex.synchronize { @stopping = true }
Expand All @@ -58,7 +58,7 @@ def run_loop
log_exception(e)

@network_is_down = true
@session_thread.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
end
rescue Errno::EBADF => _ebadf
break if terminate?
Expand Down
13 changes: 7 additions & 6 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class Session
# @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever
# @option connection_string_or_opts [Integer] :reset_recovery_attempts_after_reconnection (true) Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.
# @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?
# @option connection_string_or_opts [Object] :session_error_handler (Thread.current) Object which responds to #raise that will act as a session error handler. Defaults to Thread.current, which will raise asynchronous exceptions in the thread that created the session.
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
Expand Down Expand Up @@ -213,9 +214,9 @@ def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new)
@address_index_mutex = @mutex_impl.new

@channels = Hash.new
@recovery_completed = opts[:recovery_completed]
@recovery_completed = opts[:recovery_completed]

@origin_thread = Thread.current
@session_error_handler = opts.fetch(:session_error_handler, Thread.current)

self.reset_continuations
self.initialize_transport
Expand Down Expand Up @@ -862,7 +863,7 @@ def clean_up_and_fail_on_connection_close!(method)

clean_up_on_shutdown
if threaded?
@origin_thread.raise(@last_connection_error)
@session_error_handler.raise(@last_connection_error)
else
raise @last_connection_error
end
Expand Down Expand Up @@ -1019,7 +1020,7 @@ def start_reader_loop

# @private
def reader_loop
@reader_loop ||= ReaderLoop.new(@transport, self, Thread.current)
@reader_loop ||= ReaderLoop.new(@transport, self, @session_error_handler)
end

# @private
Expand Down Expand Up @@ -1282,7 +1283,7 @@ def open_connection
end

if threaded?
@origin_thread.raise(e)
@session_error_handler.raise(e)
else
raise e
end
Expand Down Expand Up @@ -1328,7 +1329,7 @@ def initialize_transport
@transport = Transport.new(self,
host_from_address(address),
port_from_address(address),
@opts.merge(:session_thread => @origin_thread)
@opts.merge(:session_error_handler => @session_error_handler)
)

# Reset the cached progname for the logger only when no logger was provided
Expand Down
10 changes: 5 additions & 5 deletions lib/bunny/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def read_timeout=(v)

def initialize(session, host, port, opts)
@session = session
@session_thread = opts[:session_thread]
@session_error_handler = opts[:session_error_handler]
@host = host
@port = port
@opts = opts
Expand Down Expand Up @@ -146,7 +146,7 @@ def write(data)
if @session.automatically_recover?
@session.handle_network_failure(e)
else
@session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
end
end
Expand All @@ -170,7 +170,7 @@ def write(data)
if @session.automatically_recover?
@session.handle_network_failure(e)
else
@session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
end
end
Expand All @@ -188,7 +188,7 @@ def write_without_timeout(data, raise_exceptions = false)
if @session.automatically_recover?
@session.handle_network_failure(e)
else
@session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
end
end
Expand Down Expand Up @@ -245,7 +245,7 @@ def read_fully(count)
if @session.automatically_recover?
raise
else
@session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
end
end
Expand Down

0 comments on commit c78dd67

Please sign in to comment.