From ccc2e497e62166fec5ac1994bf6f1eac646d0e47 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Mon, 29 Jul 2024 14:48:09 +0000 Subject: [PATCH] add pubsub commands Signed-off-by: Shoham Elias --- .../glide/async_commands/cluster_commands.py | 52 ++ python/python/glide/async_commands/core.py | 75 +++ .../glide/async_commands/transaction.py | 92 ++++ python/python/tests/test_pubsub.py | 491 +++++++++++++++++- python/python/tests/test_transaction.py | 14 + 5 files changed, 723 insertions(+), 1 deletion(-) diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index a0044c3e92..4a5cbf6470 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -920,6 +920,58 @@ async def publish( ) return cast(int, result) + async def pubsub_shardchannels( + self, pattern: Optional[TEncodable] = None + ) -> List[bytes]: + """ + Lists the currently active shard channels. + + See https://valkey.io/commands/pubsub-shardchannels for details. + + Args: + pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels. + If not provided, all active shard channels are returned. + + Returns: + List[bytes]: A list of currently active shard channels matching the given pattern. + If no pattern is specified, all active shard channels are returned. + + Examples: + >>> await client.pubsub_shardchannels() + [b'channel1', b'channel2'] + + >>> await client.pubsub_shardchannels("channel*") + [b'channel1', b'channel2'] + """ + command_args = [pattern] if pattern is not None else [] + return await self._execute_command(RequestType.PubSubSChannels, command_args) + + async def pubsub_shardnumsub( + self, channels: List[TEncodable] = [] + ) -> Mapping[bytes, int]: + """ + Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels. + + Note that it is valid to call this command without channels. In this case, it will just return an empty map. + + See https://valkey.io/commands/pubsub-shardnumsub for details. + + Args: + channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers. + If not provided, returns an empty map. + + Returns: + Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers. + + Examples: + >>> subscribers = await client.pubsub_shardnumsub(["channel1", "channel2"]) + {b'channel1': 3, b'channel2': 5} + + >>> await client.pubsub_shardnumsub() + {} + """ + return await self._execute_command(RequestType.PubSubSNumSub, channels) + async def flushall( self, flush_mode: Optional[FlushMode] = None, route: Optional[Route] = None ) -> TOK: diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 35d02a7eff..79a6bc8cef 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -6628,3 +6628,78 @@ async def lpos( Union[int, List[int], None], await self._execute_command(RequestType.LPos, args), ) + + async def pubsub_channels( + self, pattern: Optional[TEncodable] = None + ) -> List[bytes]: + """ + Lists the currently active channels. + + See https://valkey.io/commands/pubsub-channels for details. + + Args: + pattern (Optional[TEncodable]): A glob-style pattern to match active channels. + If not provided, all active channels are returned. + + Returns: + List[bytes]: A list of currently active channels matching the given pattern. + If no pattern is specified, all active channels are returned. + + Examples: + >>> await client.pubsub_channels() + [b"channel1", b"channel2"] + + >>> await client.pubsub_channels("news.*") + [b"news.sports", "news.weather"] + """ + + return cast( + List[str], + await self._execute_command( + RequestType.PubSubChannels, [pattern] if pattern else [] + ), + ) + + async def pubsub_numpat(self) -> int: + """ + Returns the number of unique patterns that are subscribed to by clients. + + Note: This is the total number of unique patterns all the clients are subscribed to, + not the count of clients subscribed to patterns. + + See https://valkey.io/commands/pubsub-numpat for details. + + Returns: + int: The number of unique patterns. + + Examples: + >>> await client.pubsub_numpat() + 3 + """ + return cast(int, await self._execute_command(RequestType.PubSubNumPat, [])) + + async def pubsub_numsub( + self, channels: List[TEncodable] = [] + ) -> Mapping[bytes, int]: + """ + Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels. + + Note that it is valid to call this command without channels. In this case, it will just return an empty map. + + See https://valkey.io/commands/pubsub-numsub for details. + + Args: + channels (Optional[List[str]]): The list of channels to query for the number of subscribers. + If not provided, returns an empty map. + + Returns: + Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers. + + Examples: + >>> subscribers = await client.pubsub_numsub(["channel1", "channel2"]) + {b'channel1': 3, b'channel2': 5} + + >>> await client.pubsub_numsub() + {} + """ + return await self._execute_command(RequestType.PubSubNumSub, channels) diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 6167b549c1..d99281d1c8 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -4780,6 +4780,60 @@ def xclaim_just_id( return self.append_command(RequestType.XClaim, args) + async def pubsub_channels( + self, pattern: Optional[TEncodable] = None + ) -> List[bytes]: + """ + Lists the currently active channels. + + See https://valkey.io/commands/pubsub-channels for details. + + Args: + pattern (Optional[TEncodable]): A glob-style pattern to match active channels. + If not provided, all active channels are returned. + + Command response: + List[bytes]: A list of currently active channels matching the given pattern. + If no pattern is specified, all active channels are returned. + """ + + return self.append_command( + RequestType.PubSubChannels, [pattern] if pattern else [] + ) + + def pubsub_numpat(self: TTransaction) -> TTransaction: + """ + Returns the number of unique patterns that are subscribed to by clients. + + Note: This is the total number of unique patterns all the clients are subscribed to, + not the count of clients subscribed to patterns. + + See https://valkey.io/commands/pubsub-numpat for details. + + Command response: + int: The number of unique patterns. + """ + return self.append_command(RequestType.PubSubNumPat, []) + + def pubsub_numsub( + self: TTransaction, channels: List[TEncodable] = [] + ) -> TTransaction: + """ + Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels. + + Note that it is valid to call this command without channels. In this case, it will just return an empty map. + + See https://valkey.io/commands/pubsub-numsub for details. + + Args: + channels (Optional[List[str]]): The list of channels to query for the number of subscribers. + If not provided, returns an empty map. + + Command response: + Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers. + """ + return self.append_command(RequestType.PubSubNumSub, channels) + class Transaction(BaseTransaction): """ @@ -5172,4 +5226,42 @@ def publish( RequestType.SPublish if sharded else RequestType.Publish, [channel, message] ) + def pubsub_shardchannels( + self, pattern: Optional[TEncodable] = None + ) -> "ClusterTransaction": + """ + Lists the currently active shard channels. + + See https://valkey.io/commands/pubsub-shardchannels for details. + + Args: + pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels. + If not provided, all active shard channels are returned. + + Command response: + List[bytes]: A list of currently active shard channels matching the given pattern. + If no pattern is specified, all active shard channels are returned. + """ + command_args = [pattern] if pattern is not None else [] + return self.append_command(RequestType.PubSubSChannels, command_args) + + def pubsub_shardnumsub( + self, channels: List[TEncodable] = [] + ) -> "ClusterTransaction": + """ + Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels. + + Note that it is valid to call this command without channels. In this case, it will just return an empty map. + + See https://valkey.io/commands/pubsub-shardnumsub for details. + + Args: + channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers. + If not provided, returns an empty map. + + Command response: + Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers. + """ + return self.append_command(RequestType.PubSubSNumSub, channels) + # TODO: add all CLUSTER commands diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index 23b5bb6709..29593ef0bd 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -13,7 +13,7 @@ GlideClusterClientConfiguration, ProtocolVersion, ) -from glide.constants import OK +from glide.constants import OK, TEncodable from glide.exceptions import ConfigurationError from glide.glide_client import BaseClient, GlideClient, GlideClusterClient, TGlideClient from tests.conftest import create_client @@ -2257,3 +2257,492 @@ async def test_pubsub_context_with_no_callback_raise_error( with pytest.raises(ConfigurationError): await create_two_clients_with_pubsub(request, cluster_mode, pub_sub_exact) + + @pytest.mark.parametrize("cluster_mode", [True, False]) + async def test_pubsub_channels(self, request, cluster_mode: bool): + """ + Tests the pubsub_channels command functionality. + + This test verifies that the pubsub_channels command correctly returns + the active channels matching a specified pattern. + """ + client1, client2 = None, None + try: + channel1 = "test_channel1" + channel2 = "test_channel2" + channel3 = "some_channel3" + pattern = "test_*" + + client = await create_client(request, cluster_mode) + # Assert no channels exists yet + assert await client.pubsub_channels() == [] + await client.close() + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + channel1, + channel2, + channel3, + } + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: { + channel1, + channel2, + channel3, + } + }, + ) + + channel1_bytes = channel1.encode() + channel2_bytes = channel2.encode() + channel3_bytes = channel3.encode() + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + + # Test pubsub_channels without pattern + channels = await client2.pubsub_channels() + assert set(channels) == {channel1_bytes, channel2_bytes, channel3_bytes} + + # Test pubsub_channels with pattern + channels_with_pattern = await client2.pubsub_channels(pattern) + assert set(channels_with_pattern) == {channel1_bytes, channel2_bytes} + + # Test with non-matching pattern + non_matching_channels = await client2.pubsub_channels("non_matching_*") + assert len(non_matching_channels) == 0 + + finally: + await client_cleanup(client1, pub_sub if cluster_mode else None) + await client_cleanup(client2, None) + + @pytest.mark.parametrize("cluster_mode", [True, False]) + async def test_pubsub_numpat(self, request, cluster_mode: bool): + """ + Tests the pubsub_numpat command functionality. + + This test verifies that the pubsub_numpat command correctly returns + the number of unique patterns that are subscribed to by clients. + """ + client1, client2 = None, None + try: + pattern1 = "test_*" + pattern2 = "another_*" + + # Create a client and check initial number of patterns + client = await create_client(request, cluster_mode) + assert await client.pubsub_numpat() == 0 + await client.close() + + # Set up subscriptions with patterns + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + pattern1, + pattern2, + } + }, + { + GlideClientConfiguration.PubSubChannelModes.Pattern: { + pattern1, + pattern2, + } + }, + ) + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + + # Test pubsub_numpat + num_patterns = await client2.pubsub_numpat() + assert num_patterns == 2 + + finally: + await client_cleanup(client1, pub_sub if cluster_mode else None) + await client_cleanup(client2, None) + + @pytest.mark.parametrize("cluster_mode", [True, False]) + async def test_pubsub_numsub(self, request, cluster_mode: bool): + """ + Tests the pubsub_numsub command functionality. + + This test verifies that the pubsub_numsub command correctly returns + the number of subscribers for specified channels. + """ + client1, client2, client3, client4 = None, None, None, None + try: + channel1 = "test_channel1" + channel2 = "test_channel2" + channel3 = "test_channel3" + channel4 = "test_channel4" + + # Set up subscriptions + pub_sub1 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + channel1, + channel2, + channel3, + } + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: { + channel1, + channel2, + channel3, + } + }, + ) + pub_sub2 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + channel2, + channel3, + } + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: { + channel2, + channel3, + } + }, + ) + pub_sub3 = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel3}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel3}}, + ) + + channel1_bytes = channel1.encode() + channel2_bytes = channel2.encode() + channel3_bytes = channel3.encode() + channel4_bytes = channel4.encode() + + # Create a client and check initial subscribers + client = await create_client(request, cluster_mode) + assert await client.pubsub_numsub([channel1, channel2, channel3]) == { + channel1_bytes: 0, + channel2_bytes: 0, + channel3_bytes: 0, + } + await client.close() + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub1, pub_sub2 + ) + client3, client4 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub3 + ) + + # Test pubsub_numsub + subscribers = await client2.pubsub_numsub( + [channel1_bytes, channel2_bytes, channel3_bytes, channel4_bytes] + ) + assert subscribers == { + channel1_bytes: 1, + channel2_bytes: 2, + channel3_bytes: 3, + channel4_bytes: 0, + } + + # Test pubsub_numsub with no channels + empty_subscribers = await client2.pubsub_numsub() + assert empty_subscribers == {} + + finally: + await client_cleanup(client1, pub_sub1 if cluster_mode else None) + await client_cleanup(client2, pub_sub2 if cluster_mode else None) + await client_cleanup(client3, pub_sub3 if cluster_mode else None) + await client_cleanup(client4, None) + + @pytest.mark.parametrize("cluster_mode", [True]) + async def test_pubsub_shardchannels(self, request, cluster_mode: bool): + """ + Tests the pubsub_shardchannels command functionality. + + This test verifies that the pubsub_shardchannels command correctly returns + the active sharded channels matching a specified pattern. + """ + client1, client2 = None, None + try: + channel1 = "test_shardchannel1" + channel2 = "test_shardchannel2" + channel3 = "some_shardchannel3" + pattern = "test_*" + + client = await create_client(request, cluster_mode) + assert type(client) == GlideClusterClient + # Assert no sharded channels exist yet + assert await client.pubsub_shardchannels() == [] + await client.close() + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + channel1, + channel2, + channel3, + } + }, + {}, # Empty dict for non-cluster mode as sharded channels are not supported + ) + + channel1_bytes = channel1.encode() + channel2_bytes = channel2.encode() + channel3_bytes = channel3.encode() + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + + min_version = "7.0.0" + if await check_if_server_version_lt(client1, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") + + assert type(client2) == GlideClusterClient + + # Test pubsub_shardchannels without pattern + channels = await client2.pubsub_shardchannels() + assert set(channels) == {channel1_bytes, channel2_bytes, channel3_bytes} + + # Test pubsub_shardchannels with pattern + channels_with_pattern = await client2.pubsub_shardchannels(pattern) + assert set(channels_with_pattern) == {channel1_bytes, channel2_bytes} + + # Test with non-matching pattern + assert await client2.pubsub_shardchannels("non_matching_*") == [] + + finally: + await client_cleanup(client1, pub_sub if cluster_mode else None) + await client_cleanup(client2, None) + + @pytest.mark.parametrize("cluster_mode", [True]) + async def test_pubsub_shardnumsub(self, request, cluster_mode: bool): + """ + Tests the pubsub_shardnumsub command functionality. + + This test verifies that the pubsub_shardnumsub command correctly returns + the number of subscribers for specified sharded channels. + """ + client1, client2, client3, client4 = None, None, None, None + try: + channel1 = "test_shardchannel1" + channel2 = "test_shardchannel2" + channel3 = "test_shardchannel3" + channel4 = "test_shardchannel4" + + # Set up subscriptions + pub_sub1 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + channel1, + channel2, + channel3, + } + }, + {}, + ) + pub_sub2 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + channel2, + channel3, + } + }, + {}, + ) + pub_sub3 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + channel3 + } + }, + {}, + ) + + channel1_bytes = channel1.encode() + channel2_bytes = channel2.encode() + channel3_bytes = channel3.encode() + channel4_bytes = channel4.encode() + + # Create a client and check initial subscribers + client = await create_client(request, cluster_mode) + assert type(client) == GlideClusterClient + assert await client.pubsub_shardnumsub([channel1, channel2, channel3]) == { + channel1_bytes: 0, + channel2_bytes: 0, + channel3_bytes: 0, + } + await client.close() + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub1, pub_sub2 + ) + + min_version = "7.0.0" + if await check_if_server_version_lt(client1, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") + + client3, client4 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub3 + ) + + assert type(client4) == GlideClusterClient + + # Test pubsub_shardnumsub + subscribers = await client4.pubsub_shardnumsub( + [channel1, channel2, channel3, channel4] + ) + assert subscribers == { + channel1_bytes: 1, + channel2_bytes: 2, + channel3_bytes: 3, + channel4_bytes: 0, + } + + # Test pubsub_shardnumsub with no channels + empty_subscribers = await client4.pubsub_shardnumsub() + assert empty_subscribers == {} + + finally: + await client_cleanup(client1, pub_sub1 if cluster_mode else None) + await client_cleanup(client2, pub_sub2 if cluster_mode else None) + await client_cleanup(client3, pub_sub3 if cluster_mode else None) + await client_cleanup(client4, None) + + @pytest.mark.parametrize("cluster_mode", [True]) + async def test_pubsub_channels_and_shardchannels_separation( + self, request, cluster_mode: bool + ): + """ + Tests that pubsub_channels doesn't return sharded channels and pubsub_shardchannels + doesn't return regular channels. + """ + client1, client2 = None, None + try: + regular_channel = "regular_channel" + shard_channel = "shard_channel" + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + regular_channel + }, + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + shard_channel + }, + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {regular_channel}}, + ) + + regular_channel_bytes, shard_channel_bytes = ( + regular_channel.encode(), + shard_channel.encode(), + ) + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + + min_version = "7.0.0" + if await check_if_server_version_lt(client1, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") + + assert type(client2) == GlideClusterClient + # Test pubsub_channels + assert await client2.pubsub_channels() == [regular_channel_bytes] + + # Test pubsub_shardchannels + assert await client2.pubsub_shardchannels() == [shard_channel_bytes] + + finally: + await client_cleanup(client1, pub_sub if cluster_mode else None) + await client_cleanup(client2, None) + + @pytest.mark.parametrize("cluster_mode", [True]) + async def test_pubsub_numsub_and_shardnumsub_separation( + self, request, cluster_mode: bool + ): + """ + Tests that pubsub_numsub doesn't count sharded channel subscribers and pubsub_shardnumsub + doesn't count regular channel subscribers. + """ + client1, client2 = None, None + try: + regular_channel = "regular_channel" + shard_channel = "shard_channel" + + pub_sub1 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + regular_channel + }, + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + shard_channel + }, + }, + {}, + ) + pub_sub2 = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + regular_channel + }, + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + shard_channel + }, + }, + {}, + ) + + regular_channel_bytes: bytes = regular_channel.encode() + shard_channel_bytes: bytes = shard_channel.encode() + + client1, client2 = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub1, pub_sub2 + ) + + min_version = "7.0.0" + if await check_if_server_version_lt(client1, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") + + assert type(client2) == GlideClusterClient + + # Test pubsub_numsub + regular_subscribers = await client2.pubsub_numsub( + [regular_channel_bytes, shard_channel_bytes] + ) + + assert regular_subscribers == { + regular_channel_bytes: 2, + shard_channel_bytes: 0, + } + + # Test pubsub_shardnumsub + shard_subscribers = await client2.pubsub_shardnumsub( + [regular_channel_bytes, shard_channel_bytes] + ) + + assert shard_subscribers == { + regular_channel_bytes: 0, + shard_channel_bytes: 2, + } + + finally: + await client_cleanup(client1, pub_sub1 if cluster_mode else None) + await client_cleanup(client2, pub_sub2 if cluster_mode else None) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 2b1293b943..9d8f09f865 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -772,6 +772,13 @@ async def transaction_test( transaction.lcs_idx(key23, key24, with_match_len=True) args.append({b"matches": [[[4, 7], [5, 8], 4], [[1, 3], [0, 2], 3]], b"len": 7}) + transaction.pubsub_channels(pattern="*") + args.append([]) + transaction.pubsub_numpat() + args.append(0) + transaction.pubsub_numsub() + args.append({}) + return args @@ -882,6 +889,13 @@ async def test_cluster_transaction(self, glide_client: GlideClusterClient): else: transaction.publish("test_message", keyslot, True) expected = await transaction_test(transaction, keyslot, glide_client) + + if not await check_if_server_version_lt(glide_client, "7.0.0"): + transaction.pubsub_shardchannels() + expected.append([]) + transaction.pubsub_shardnumsub() + expected.append({}) + result = await glide_client.exec(transaction) assert isinstance(result, list) assert isinstance(result[0], bytes)