From 8abde0330f08fa94d08f97ad2986ea39ab04ab77 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Fri, 19 Aug 2022 11:44:40 +0200 Subject: [PATCH] Refactor pubusb to allow subscribing and unsubscribing from another thread redis-rb pubsub always has been very hard to work with, because once you are subscribed, you can no longer manage the subscription outside of the various callback. But the `SUBSCRIBE` family of command returns no response, and the subscribed client only reads the socket. So we can actually send `SUBSCRIBE` style of commands from another thread. --- CHANGELOG.md | 2 + lib/redis.rb | 30 +++++--- lib/redis/commands/pubsub.rb | 28 ++----- lib/redis/distributed.rb | 2 +- lib/redis/errors.rb | 3 + lib/redis/subscribe.rb | 16 ++-- test/distributed/publish_subscribe_test.rb | 2 +- test/redis/publish_subscribe_test.rb | 86 +++++++++++++++++++++- 8 files changed, 123 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1897b160..87e68b896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Unreleased +- Allow to call `subscribe`, `unsubscribe`, `psubscribe` and `punsubscribe` from a subscribed client. See #1131. + # 5.0.0.beta3 - Use `MD5` for hashing server nodes in `Redis::Distributed`. This should improve keys distribution among servers. See #1089. diff --git a/lib/redis.rb b/lib/redis.rb index c56900626..42197dbac 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -165,19 +165,27 @@ def send_blocking_command(command, timeout, &block) end def _subscription(method, timeout, channels, block) - if @subscription_client - return @subscription_client.call_v([method] + channels) - end + if block + if @subscription_client + raise SubscriptionError, "This client is already subscribed" + end - begin - @subscription_client = SubscribedClient.new(@client.pubsub) - if timeout > 0 - @subscription_client.send(method, timeout, *channels, &block) - else - @subscription_client.send(method, *channels, &block) + begin + @subscription_client = SubscribedClient.new(@client.pubsub) + if timeout > 0 + @subscription_client.send(method, timeout, *channels, &block) + else + @subscription_client.send(method, *channels, &block) + end + ensure + @subscription_client = nil end - ensure - @subscription_client = nil + else + unless @subscription_client + raise SubscriptionError, "This client is not subscribed" + end + + @subscription_client.call_v([method].concat(channels)) end end end diff --git a/lib/redis/commands/pubsub.rb b/lib/redis/commands/pubsub.rb index 57da2a429..5d84e3fab 100644 --- a/lib/redis/commands/pubsub.rb +++ b/lib/redis/commands/pubsub.rb @@ -14,50 +14,34 @@ def subscribed? # Listen for messages published to the given channels. def subscribe(*channels, &block) - synchronize do |_client| - _subscription(:subscribe, 0, channels, block) - end + _subscription(:subscribe, 0, channels, block) end # Listen for messages published to the given channels. Throw a timeout error # if there is no messages for a timeout period. def subscribe_with_timeout(timeout, *channels, &block) - synchronize do |_client| - _subscription(:subscribe_with_timeout, timeout, channels, block) - end + _subscription(:subscribe_with_timeout, timeout, channels, block) end # Stop listening for messages posted to the given channels. def unsubscribe(*channels) - raise "Can't unsubscribe if not subscribed." unless subscribed? - - synchronize do |_client| - _subscription(:unsubscribe, 0, channels, nil) - end + _subscription(:unsubscribe, 0, channels, nil) end # Listen for messages published to channels matching the given patterns. def psubscribe(*channels, &block) - synchronize do |_client| - _subscription(:psubscribe, 0, channels, block) - end + _subscription(:psubscribe, 0, channels, block) end # Listen for messages published to channels matching the given patterns. # Throw a timeout error if there is no messages for a timeout period. def psubscribe_with_timeout(timeout, *channels, &block) - synchronize do |_client| - _subscription(:psubscribe_with_timeout, timeout, channels, block) - end + _subscription(:psubscribe_with_timeout, timeout, channels, block) end # Stop listening for messages posted to channels matching the given patterns. def punsubscribe(*channels) - raise "Can't unsubscribe if not subscribed." unless subscribed? - - synchronize do |_client| - _subscription(:punsubscribe, 0, channels, nil) - end + _subscription(:punsubscribe, 0, channels, nil) end # Inspect the state of the Pub/Sub subsystem. diff --git a/lib/redis/distributed.rb b/lib/redis/distributed.rb index 0c55ce6c0..c238d2939 100644 --- a/lib/redis/distributed.rb +++ b/lib/redis/distributed.rb @@ -904,7 +904,7 @@ def subscribe(channel, *channels, &block) # Stop listening for messages posted to the given channels. def unsubscribe(*channels) - raise "Can't unsubscribe if not subscribed." unless subscribed? + raise SubscriptionError, "Can't unsubscribe if not subscribed." unless subscribed? @subscribed_node.unsubscribe(*channels) end diff --git a/lib/redis/errors.rb b/lib/redis/errors.rb index ab74813a2..f7b9e0cce 100644 --- a/lib/redis/errors.rb +++ b/lib/redis/errors.rb @@ -52,4 +52,7 @@ class InheritedError < BaseConnectionError # Raised when client options are invalid. class InvalidClientOptionError < BaseError end + + class SubscriptionError < BaseError + end end diff --git a/lib/redis/subscribe.rb b/lib/redis/subscribe.rb index 41d612167..94f0f0267 100644 --- a/lib/redis/subscribe.rb +++ b/lib/redis/subscribe.rb @@ -4,10 +4,13 @@ class Redis class SubscribedClient def initialize(client) @client = client + @write_monitor = Monitor.new end def call_v(command) - @client.call_v(command) + @write_monitor.synchronize do + @client.call_v(command) + end end def subscribe(*channels, &block) @@ -43,14 +46,16 @@ def close def subscription(start, stop, channels, block, timeout = 0) sub = Subscription.new(&block) - @client.call_v([start, *channels]) + call_v([start, *channels]) while event = @client.next_event(timeout) if event.is_a?(::RedisClient::CommandError) raise Client::ERROR_MAPPING.fetch(event.class), event.message end type, *rest = event - sub.callbacks[type].call(*rest) + if callback = sub.callbacks[type] + callback.call(*rest) + end break if type == stop && rest.last == 0 end # No need to unsubscribe here. The real client closes the connection @@ -62,10 +67,7 @@ class Subscription attr :callbacks def initialize - @callbacks = Hash.new do |hash, key| - hash[key] = ->(*_) {} - end - + @callbacks = {} yield(self) end diff --git a/test/distributed/publish_subscribe_test.rb b/test/distributed/publish_subscribe_test.rb index a5a3979ac..34f8d69b6 100644 --- a/test/distributed/publish_subscribe_test.rb +++ b/test/distributed/publish_subscribe_test.rb @@ -83,7 +83,7 @@ def test_other_commands_within_a_subscribe end def test_subscribe_without_a_block - assert_raises LocalJumpError do + assert_raises Redis::SubscriptionError do r.subscribe("foo") end end diff --git a/test/redis/publish_subscribe_test.rb b/test/redis/publish_subscribe_test.rb index 9259f72d1..262bf4a64 100644 --- a/test/redis/publish_subscribe_test.rb +++ b/test/redis/publish_subscribe_test.rb @@ -205,19 +205,22 @@ def test_other_commands_within_a_subscribe end def test_subscribe_without_a_block - assert_raises LocalJumpError do + error = assert_raises Redis::SubscriptionError do r.subscribe(channel_name) end + assert_includes "This client is not subscribed", error.message end def test_unsubscribe_without_a_subscribe - assert_raises RuntimeError do + error = assert_raises Redis::SubscriptionError do r.unsubscribe end + assert_includes "This client is not subscribed", error.message - assert_raises RuntimeError do + error = assert_raises Redis::SubscriptionError do r.punsubscribe end + assert_includes "This client is not subscribed", error.message end def test_subscribe_past_a_timeout @@ -264,12 +267,87 @@ def test_psubscribe_with_timeout refute received end + def test_unsubscribe_from_another_thread + @unsubscribed = @subscribed = false + @subscribed_redis = nil + @messages = [] + @messages_count = 0 + thread = new_thread do |r| + @subscribed_redis = r + r.subscribe(channel_name) do |on| + on.subscribe do |_channel, _total| + @subscribed = true + end + + on.message do |channel, message| + @messages << [channel, message] + @messages_count += 1 + end + + on.unsubscribe do |_channel, _total| + @unsubscribed = true + end + end + end + + Thread.pass until @subscribed + + redis.publish(channel_name, "test") + Thread.pass until @messages_count == 1 + assert_equal [channel_name, "test"], @messages.last + + @subscribed_redis.unsubscribe # this shouldn't block + refute_nil thread.join(2) + assert_equal true, @unsubscribed + end + + def test_subscribe_from_another_thread + @events = [] + @subscribed_redis = nil + thread = new_thread do |r| + r.subscribe(channel_name) do |on| + @subscribed_redis = r + on.subscribe do |channel, _total| + @events << ["subscribed", channel] + end + + on.message do |channel, message| + @events << ["message", channel, message] + end + + on.unsubscribe do |channel, _total| + @events << ["unsubscribed", channel] + end + end + end + + Thread.pass until @subscribed_redis&.subscribed? + + assert_equal 1, redis.publish(channel_name, "test") + @subscribed_redis.subscribe("#{channel_name}:2") + redis.publish("#{channel_name}:2", "test-2") + + @subscribed_redis.unsubscribe(channel_name) + @subscribed_redis.unsubscribe # this shouldn't block + + refute_nil thread.join(2) + expected = [ + ["subscribed", channel_name], + ["message", channel_name, "test"], + ["subscribed", "#{channel_name}:2"], + ["message", "#{channel_name}:2", "test-2"], + ["unsubscribed", channel_name], + ["unsubscribed", "#{channel_name}:2"] + ] + assert_equal expected, @events + end + private def new_thread(&block) redis = Redis.new(OPTIONS) thread = Thread.new(redis, &block) - thread.report_on_exception = false + thread.report_on_exception = true @threads[thread] = redis thread end