Skip to content

Commit

Permalink
Merge branch 'sbonebrake-send_frameset_exception'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Aug 15, 2018
2 parents 01867cb + 389d08f commit cf2b1a4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 6 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ end
group :test do
gem "rspec", "~> 3.5.0"
gem "rabbitmq_http_api_client", "~> 1.9.1", require: "rabbitmq/http/client"
gem "toxiproxy", "~> 1.0.3"
end

gemspec
Expand Down
10 changes: 9 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,12 @@ services:
- 5671-5672:5671-5672
- 15672:15672
volumes:
- ./spec:/spec:ro
- ./spec:/spec:ro
toxiproxy:
container_name: toxiproxy
image: shopify/toxiproxy
ports:
- 8474:8474
- 11111:11111
depends_on:
- rabbitmq
18 changes: 13 additions & 5 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,13 @@ def send_frameset(frames, channel)
# still recommend not sharing channels between threads except for consumer-only cases in the docs. MK.
channel.synchronize do
# see rabbitmq/rabbitmq-server#156
data = frames.reduce("") { |acc, frame| acc << frame.encode }
@transport.write(data)
signal_activity!
if open?
data = frames.reduce("") { |acc, frame| acc << frame.encode }
@transport.write(data)
signal_activity!
else
raise ConnectionClosedError.new(frames)
end
end
end # send_frameset(frames)

Expand All @@ -1097,8 +1101,12 @@ def send_frameset_without_timeout(frames, channel)
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
# locking. See a note about "single frame" methods in a comment in `send_frameset`. MK.
channel.synchronize do
frames.each { |frame| self.send_frame_without_timeout(frame, false) }
signal_activity!
if open?
frames.each { |frame| self.send_frame_without_timeout(frame, false) }
signal_activity!
else
raise ConnectionClosedError.new(frames)
end
end
end # send_frameset_without_timeout(frames)

Expand Down
37 changes: 37 additions & 0 deletions spec/higher_level_api/integration/toxiproxy_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require "spec_helper"
require_relative "../../toxiproxy_helper"

if ::Toxiproxy.running?
describe Bunny::Channel, "#basic_publish" do
include RabbitMQ::Toxiproxy

before(:all) do
setup_toxiproxy
@connection = Bunny.new(:user => "bunny_gem", :password => "bunny_password", :vhost => "bunny_testbed",
host: "localhost:11111", heartbeat_timeout: 1)
@connection.start
end

after :all do
@connection.close if @connection.open?
end

context "when the the connection detects missed heartbeats" do
let(:queue_name) { "bunny.basic.publish.queue#{rand}" }

it "raises a ConnectionClosedError" do
ch = @connection.create_channel
begin
rabbitmq_toxiproxy.down do
sleep 2
expect { ch.default_exchange.publish("", :routing_key => queue_name) }.to raise_error(Bunny::ConnectionClosedError)
end
ensure
cleanup_toxiproxy
end
end
end
end
else
puts "Toxiproxy isn't running, some examples will be skipped"
end
20 changes: 20 additions & 0 deletions spec/toxiproxy_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module RabbitMQ
module Toxiproxy
def setup_toxiproxy
::Toxiproxy.populate([{
name: "rabbitmq",
listen: "0.0.0.0:11111",
upstream: "rabbitmq:5672"
}])
rabbitmq_toxiproxy.enable
end

def cleanup_toxiproxy
::Toxiproxy.populate()
end

def rabbitmq_toxiproxy
::Toxiproxy[/rabbitmq/]
end
end
end

0 comments on commit cf2b1a4

Please sign in to comment.