From daa372ad20f98354ff22fbed10db584097103371 Mon Sep 17 00:00:00 2001 From: Stefan Sedich Date: Thu, 6 Oct 2016 23:15:37 -0700 Subject: [PATCH 1/4] Implement gracefull consumer shutdown by allow a configurable timeout to be set on the worker pool --- lib/bunny/channel.rb | 2 +- lib/bunny/consumer_work_pool.rb | 17 +++++++++++++++-- lib/bunny/session.rb | 4 ++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb index 984701023..79fe4a138 100644 --- a/lib/bunny/channel.rb +++ b/lib/bunny/channel.rb @@ -952,7 +952,7 @@ def basic_cancel(consumer_tag) @last_basic_cancel_ok = wait_on_continuations end - maybe_kill_consumer_work_pool! unless any_consumers? + @work_pool.shutdown(true) unless any_consumers? @last_basic_cancel_ok end diff --git a/lib/bunny/consumer_work_pool.rb b/lib/bunny/consumer_work_pool.rb index b45106d16..51efda9ef 100644 --- a/lib/bunny/consumer_work_pool.rb +++ b/lib/bunny/consumer_work_pool.rb @@ -17,9 +17,12 @@ class ConsumerWorkPool attr_reader :size attr_reader :abort_on_exception - def initialize(size = 1, abort_on_exception = false) + def initialize(size = 1, abort_on_exception = false, shutdown_timeout = nil) @size = size @abort_on_exception = abort_on_exception + @shutdown_timeout = shutdown_timeout + @shutdown_mutex = ::Mutex.new + @shutdown_conditional = ::ConditionVariable.new @queue = ::Queue.new @paused = false end @@ -53,7 +56,7 @@ def busy? !@queue.empty? end - def shutdown + def shutdown(wait_for_workers = false) @running = false @size.times do @@ -61,6 +64,12 @@ def shutdown throw :terminate end end + + return unless wait_for_workers + + @shutdown_mutex.synchronize do + @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) + end end def join(timeout = nil) @@ -102,6 +111,10 @@ def run_loop end end end + + @shutdown_mutex.synchronize do + @shutdown_conditional.signal unless busy? + end end end end diff --git a/lib/bunny/session.rb b/lib/bunny/session.rb index a77d839cc..3925667e9 100644 --- a/lib/bunny/session.rb +++ b/lib/bunny/session.rb @@ -337,14 +337,14 @@ def transport_write_timeout # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel - def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false) + def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = nil) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else - ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception)) + ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)) ch.open ch end From 47f496010404ccac83ed5e6c942117d924aac409 Mon Sep 17 00:00:00 2001 From: Stefan Sedich Date: Thu, 6 Oct 2016 23:42:20 -0700 Subject: [PATCH 2/4] Add spec and fix condition check in worker pool shutdown --- lib/bunny/consumer_work_pool.rb | 2 +- .../integration/basic_cancel_spec.rb | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/bunny/consumer_work_pool.rb b/lib/bunny/consumer_work_pool.rb index 51efda9ef..7d977232c 100644 --- a/lib/bunny/consumer_work_pool.rb +++ b/lib/bunny/consumer_work_pool.rb @@ -65,7 +65,7 @@ def shutdown(wait_for_workers = false) end end - return unless wait_for_workers + return unless wait_for_workers && @shutdown_timeout @shutdown_mutex.synchronize do @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) diff --git a/spec/higher_level_api/integration/basic_cancel_spec.rb b/spec/higher_level_api/integration/basic_cancel_spec.rb index bae846154..7e52b14a4 100644 --- a/spec/higher_level_api/integration/basic_cancel_spec.rb +++ b/spec/higher_level_api/integration/basic_cancel_spec.rb @@ -73,4 +73,37 @@ expect(delivered_data).to be_empty end end + + context "with a worker pool shutdown timeout configured" do + let(:queue_name) { "bunny.queues.#{rand}" } + + it "allows the existing message to be complate processing" do + delivered_data = [] + consumer = nil + + t = Thread.new do + ch = connection.create_channel(nil, 1, false, 5) + q = ch.queue(queue_name, :auto_delete => true, :durable => false) + + consumer = Bunny::Consumer.new(ch, q) + consumer.on_delivery do |_, _, payload| + sleep 2 + delivered_data << payload + end + + q.subscribe_with(consumer, :block => false) + end + t.abort_on_exception = true + sleep 1.0 + + ch = connection.create_channel + ch.default_exchange.publish("", :routing_key => queue_name) + sleep 0.7 + + consumer.cancel + sleep 1.0 + + expect(delivered_data).to_not be_empty + end + end end From 24df388ad9f47d8369f2e7a9614c6cc9ab543e3f Mon Sep 17 00:00:00 2001 From: Stefan Sedich Date: Thu, 6 Oct 2016 23:57:19 -0700 Subject: [PATCH 3/4] Add spec covering the case where the message processing takes longer than the timeout --- .../integration/basic_cancel_spec.rb | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/spec/higher_level_api/integration/basic_cancel_spec.rb b/spec/higher_level_api/integration/basic_cancel_spec.rb index 7e52b14a4..cff8dbde2 100644 --- a/spec/higher_level_api/integration/basic_cancel_spec.rb +++ b/spec/higher_level_api/integration/basic_cancel_spec.rb @@ -77,7 +77,7 @@ context "with a worker pool shutdown timeout configured" do let(:queue_name) { "bunny.queues.#{rand}" } - it "allows the existing message to be complate processing" do + it "processes the message if processing completes within the timeout" do delivered_data = [] consumer = nil @@ -105,5 +105,34 @@ expect(delivered_data).to_not be_empty end + + it "kills the consumer if processing takes longer than the timeout" do + delivered_data = [] + consumer = nil + + t = Thread.new do + ch = connection.create_channel(nil, 1, false, 1) + q = ch.queue(queue_name, :auto_delete => true, :durable => false) + + consumer = Bunny::Consumer.new(ch, q) + consumer.on_delivery do |_, _, payload| + sleep 3 + delivered_data << payload + end + + q.subscribe_with(consumer, :block => false) + end + t.abort_on_exception = true + sleep 1.0 + + ch = connection.create_channel + ch.default_exchange.publish("", :routing_key => queue_name) + sleep 0.7 + + consumer.cancel + sleep 1.0 + + expect(delivered_data).to be_empty + end end end From 6df5d46e28af91a9a4bff8e9ac92a4441d0be9e5 Mon Sep 17 00:00:00 2001 From: Stefan Sedich Date: Fri, 7 Oct 2016 00:04:48 -0700 Subject: [PATCH 4/4] Change default shutdown timeout to 60 seconds --- lib/bunny/consumer_work_pool.rb | 2 +- lib/bunny/session.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/bunny/consumer_work_pool.rb b/lib/bunny/consumer_work_pool.rb index 7d977232c..b0bde745a 100644 --- a/lib/bunny/consumer_work_pool.rb +++ b/lib/bunny/consumer_work_pool.rb @@ -17,7 +17,7 @@ class ConsumerWorkPool attr_reader :size attr_reader :abort_on_exception - def initialize(size = 1, abort_on_exception = false, shutdown_timeout = nil) + def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) @size = size @abort_on_exception = abort_on_exception @shutdown_timeout = shutdown_timeout diff --git a/lib/bunny/session.rb b/lib/bunny/session.rb index 3925667e9..f44447936 100644 --- a/lib/bunny/session.rb +++ b/lib/bunny/session.rb @@ -337,7 +337,7 @@ def transport_write_timeout # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel - def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = nil) + def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n @channel_mutex.synchronize do