Skip to content

Commit

Permalink
add pubsub commands
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <shohame@amazon.com>
  • Loading branch information
shohamazon committed Jul 29, 2024
1 parent 1aaec1c commit 5ca1b6c
Show file tree
Hide file tree
Showing 5 changed files with 730 additions and 1 deletion.
58 changes: 58 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,64 @@ async def publish(
)
return cast(int, result)

async def pubsub_shardchannels(
self, pattern: Optional[TEncodable] = None
) -> List[bytes]:
"""
Lists the currently active shard channels.
See https://valkey.io/commands/pubsub-shardchannels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
If not provided, all active shard channels are returned.
Returns:
List[bytes]: A list of currently active shard channels matching the given pattern.
If no pattern is specified, all active shard channels are returned.
Examples:
>>> await client.pubsub_shardchannels()
[b'channel1', b'channel2']
>>> await client.pubsub_shardchannels("channel*")
[b'channel1', b'channel2']
"""
command_args = [pattern] if pattern is not None else []
return cast(
List[bytes],
await self._execute_command(RequestType.PubSubSChannels, command_args),
)

async def pubsub_shardnumsub(
self, channels: List[TEncodable] = []
) -> Mapping[bytes, int]:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-shardnumsub for details.
Args:
channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
If not provided, returns an empty map.
Returns:
Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
Examples:
>>> subscribers = await client.pubsub_shardnumsub(["channel1", "channel2"])
{b'channel1': 3, b'channel2': 5}
>>> await client.pubsub_shardnumsub()
{}
"""
return cast(
Mapping[bytes, int],
await self._execute_command(RequestType.PubSubSNumSub, channels),
)

async def flushall(
self, flush_mode: Optional[FlushMode] = None, route: Optional[Route] = None
) -> TOK:
Expand Down
78 changes: 78 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6628,3 +6628,81 @@ async def lpos(
Union[int, List[int], None],
await self._execute_command(RequestType.LPos, args),
)

async def pubsub_channels(
self, pattern: Optional[TEncodable] = None
) -> List[bytes]:
"""
Lists the currently active channels.
See https://valkey.io/commands/pubsub-channels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active channels.
If not provided, all active channels are returned.
Returns:
List[bytes]: A list of currently active channels matching the given pattern.
If no pattern is specified, all active channels are returned.
Examples:
>>> await client.pubsub_channels()
[b"channel1", b"channel2"]
>>> await client.pubsub_channels("news.*")
[b"news.sports", "news.weather"]
"""

return cast(
List[bytes],
await self._execute_command(
RequestType.PubSubChannels, [pattern] if pattern else []
),
)

async def pubsub_numpat(self) -> int:
"""
Returns the number of unique patterns that are subscribed to by clients.
Note: This is the total number of unique patterns all the clients are subscribed to,
not the count of clients subscribed to patterns.
See https://valkey.io/commands/pubsub-numpat for details.
Returns:
int: The number of unique patterns.
Examples:
>>> await client.pubsub_numpat()
3
"""
return cast(int, await self._execute_command(RequestType.PubSubNumPat, []))

async def pubsub_numsub(
self, channels: List[TEncodable] = []
) -> Mapping[bytes, int]:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-numsub for details.
Args:
channels (Optional[List[str]]): The list of channels to query for the number of subscribers.
If not provided, returns an empty map.
Returns:
Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers.
Examples:
>>> subscribers = await client.pubsub_numsub(["channel1", "channel2"])
{b'channel1': 3, b'channel2': 5}
>>> await client.pubsub_numsub()
{}
"""
return cast(
Mapping[bytes, int],
await self._execute_command(RequestType.PubSubNumSub, channels),
)
90 changes: 90 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4780,6 +4780,58 @@ def xclaim_just_id(

return self.append_command(RequestType.XClaim, args)

def pubsub_channels(self, pattern: Optional[TEncodable] = None) -> List[bytes]:
"""
Lists the currently active channels.
See https://valkey.io/commands/pubsub-channels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active channels.
If not provided, all active channels are returned.
Command response:
List[bytes]: A list of currently active channels matching the given pattern.
If no pattern is specified, all active channels are returned.
"""

return self.append_command(
RequestType.PubSubChannels, [pattern] if pattern else []
)

def pubsub_numpat(self: TTransaction) -> TTransaction:
"""
Returns the number of unique patterns that are subscribed to by clients.
Note: This is the total number of unique patterns all the clients are subscribed to,
not the count of clients subscribed to patterns.
See https://valkey.io/commands/pubsub-numpat for details.
Command response:
int: The number of unique patterns.
"""
return self.append_command(RequestType.PubSubNumPat, [])

def pubsub_numsub(
self: TTransaction, channels: List[TEncodable] = []
) -> TTransaction:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-numsub for details.
Args:
channels (Optional[List[str]]): The list of channels to query for the number of subscribers.
If not provided, returns an empty map.
Command response:
Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers.
"""
return self.append_command(RequestType.PubSubNumSub, channels)


class Transaction(BaseTransaction):
"""
Expand Down Expand Up @@ -5172,4 +5224,42 @@ def publish(
RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
)

def pubsub_shardchannels(
self, pattern: Optional[TEncodable] = None
) -> "ClusterTransaction":
"""
Lists the currently active shard channels.
See https://valkey.io/commands/pubsub-shardchannels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
If not provided, all active shard channels are returned.
Command response:
List[bytes]: A list of currently active shard channels matching the given pattern.
If no pattern is specified, all active shard channels are returned.
"""
command_args = [pattern] if pattern is not None else []
return self.append_command(RequestType.PubSubSChannels, command_args)

def pubsub_shardnumsub(
self, channels: List[TEncodable] = []
) -> "ClusterTransaction":
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-shardnumsub for details.
Args:
channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
If not provided, returns an empty map.
Command response:
Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
"""
return self.append_command(RequestType.PubSubSNumSub, channels)

# TODO: add all CLUSTER commands
Loading

0 comments on commit 5ca1b6c

Please sign in to comment.