diff --git a/Gemfile b/Gemfile index 22decae60..b348fca0d 100644 --- a/Gemfile +++ b/Gemfile @@ -34,6 +34,7 @@ end group :test do gem "rspec", "~> 3.5.0" gem "rabbitmq_http_api_client", "~> 1.9.1", require: "rabbitmq/http/client" + gem "toxiproxy", "~> 1.0.3" end gemspec diff --git a/docker-compose.yml b/docker-compose.yml index 59041d7e7..f8fb53318 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,4 +17,12 @@ services: - 5671-5672:5671-5672 - 15672:15672 volumes: - - ./spec:/spec:ro \ No newline at end of file + - ./spec:/spec:ro + toxiproxy: + container_name: toxiproxy + image: shopify/toxiproxy + ports: + - 8474:8474 + - 11111:11111 + depends_on: + - rabbitmq \ No newline at end of file diff --git a/lib/bunny/session.rb b/lib/bunny/session.rb index 95d6a449a..cbc55aaba 100644 --- a/lib/bunny/session.rb +++ b/lib/bunny/session.rb @@ -1079,9 +1079,13 @@ def send_frameset(frames, channel) # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK. channel.synchronize do # see rabbitmq/rabbitmq-server#156 - data = frames.reduce("") { |acc, frame| acc << frame.encode } - @transport.write(data) - signal_activity! + if open? + data = frames.reduce("") { |acc, frame| acc << frame.encode } + @transport.write(data) + signal_activity! + else + raise ConnectionClosedError.new(frames) + end end end # send_frameset(frames) @@ -1097,8 +1101,12 @@ def send_frameset_without_timeout(frames, channel) # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK. channel.synchronize do - frames.each { |frame| self.send_frame_without_timeout(frame, false) } - signal_activity! + if open? + frames.each { |frame| self.send_frame_without_timeout(frame, false) } + signal_activity! + else + raise ConnectionClosedError.new(frames) + end end end # send_frameset_without_timeout(frames) diff --git a/spec/higher_level_api/integration/toxiproxy_spec.rb b/spec/higher_level_api/integration/toxiproxy_spec.rb new file mode 100644 index 000000000..5f464f3e2 --- /dev/null +++ b/spec/higher_level_api/integration/toxiproxy_spec.rb @@ -0,0 +1,37 @@ +require "spec_helper" +require_relative "../../toxiproxy_helper" + +if ::Toxiproxy.running? + describe Bunny::Channel, "#basic_publish" do + include RabbitMQ::Toxiproxy + + before(:all) do + setup_toxiproxy + @connection = Bunny.new(:user => "bunny_gem", :password => "bunny_password", :vhost => "bunny_testbed", + host: "localhost:11111", heartbeat_timeout: 1) + @connection.start + end + + after :all do + @connection.close if @connection.open? + end + + context "when the the connection detects missed heartbeats" do + let(:queue_name) { "bunny.basic.publish.queue#{rand}" } + + it "raises a ConnectionClosedError" do + ch = @connection.create_channel + begin + rabbitmq_toxiproxy.down do + sleep 2 + expect { ch.default_exchange.publish("", :routing_key => queue_name) }.to raise_error(Bunny::ConnectionClosedError) + end + ensure + cleanup_toxiproxy + end + end + end + end +else + puts "Toxiproxy isn't running, some examples will be skipped" +end diff --git a/spec/toxiproxy_helper.rb b/spec/toxiproxy_helper.rb new file mode 100644 index 000000000..1ba23635a --- /dev/null +++ b/spec/toxiproxy_helper.rb @@ -0,0 +1,20 @@ +module RabbitMQ + module Toxiproxy + def setup_toxiproxy + ::Toxiproxy.populate([{ + name: "rabbitmq", + listen: "0.0.0.0:11111", + upstream: "rabbitmq:5672" + }]) + rabbitmq_toxiproxy.enable + end + + def cleanup_toxiproxy + ::Toxiproxy.populate() + end + + def rabbitmq_toxiproxy + ::Toxiproxy[/rabbitmq/] + end + end +end