Skip to content

Commit

Permalink
Python: add PubSub commands (#2043)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Shoham Elias <shohame@amazon.com>
  • Loading branch information
shohamazon authored Jul 31, 2024
1 parent 55b776b commit b9e7cef
Show file tree
Hide file tree
Showing 8 changed files with 769 additions and 1 deletion.
4 changes: 4 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,10 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
})
}
}
b"PUBSUB NUMSUB" | b"PUBSUB SHARDNUMSUB" => Some(ExpectedReturnType::Map {
key_type: &None,
value_type: &None,
}),
_ => None,
}
}
Expand Down
5 changes: 5 additions & 0 deletions glide-core/src/protobuf/command_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ enum RequestType {
Scan = 206;
Wait = 208;
XClaim = 209;
PubSubChannels = 210;
PubSubNumPat = 211;
PubSubNumSub = 212;
PubSubSChannels = 213;
PubSubSNumSub = 214;
}

message Command {
Expand Down
15 changes: 15 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ pub enum RequestType {
Scan = 206,
Wait = 208,
XClaim = 209,
PubSubChannels = 210,
PubSubNumPat = 211,
PubSubNumSub = 212,
PubSubSChannels = 213,
PubSubSNumSub = 214,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -439,6 +444,11 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::Wait => RequestType::Wait,
ProtobufRequestType::XClaim => RequestType::XClaim,
ProtobufRequestType::Scan => RequestType::Scan,
ProtobufRequestType::PubSubChannels => RequestType::PubSubChannels,
ProtobufRequestType::PubSubNumSub => RequestType::PubSubNumSub,
ProtobufRequestType::PubSubNumPat => RequestType::PubSubNumPat,
ProtobufRequestType::PubSubSChannels => RequestType::PubSubSChannels,
ProtobufRequestType::PubSubSNumSub => RequestType::PubSubSNumSub,
}
}
}
Expand Down Expand Up @@ -658,6 +668,11 @@ impl RequestType {
RequestType::Wait => Some(cmd("WAIT")),
RequestType::XClaim => Some(cmd("XCLAIM")),
RequestType::Scan => Some(cmd("SCAN")),
RequestType::PubSubChannels => Some(get_two_word_command("PUBSUB", "CHANNELS")),
RequestType::PubSubNumSub => Some(get_two_word_command("PUBSUB", "NUMSUB")),
RequestType::PubSubNumPat => Some(get_two_word_command("PUBSUB", "NUMPAT")),
RequestType::PubSubSChannels => Some(get_two_word_command("PUBSUB", "SHARDCHANNELS")),
RequestType::PubSubSNumSub => Some(get_two_word_command("PUBSUB", "SHARDNUMSUB")),
}
}
}
62 changes: 62 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,68 @@ async def publish(
)
return cast(int, result)

async def pubsub_shardchannels(
self, pattern: Optional[TEncodable] = None
) -> List[bytes]:
"""
Lists the currently active shard channels.
The command is routed to all nodes, and aggregates the response to a single array.
See https://valkey.io/commands/pubsub-shardchannels for more details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
If not provided, all active shard channels are returned.
Returns:
List[bytes]: A list of currently active shard channels matching the given pattern.
If no pattern is specified, all active shard channels are returned.
Examples:
>>> await client.pubsub_shardchannels()
[b'channel1', b'channel2']
>>> await client.pubsub_shardchannels("channel*")
[b'channel1', b'channel2']
"""
command_args = [pattern] if pattern is not None else []
return cast(
List[bytes],
await self._execute_command(RequestType.PubSubSChannels, command_args),
)

async def pubsub_shardnumsub(
self, channels: Optional[List[TEncodable]] = None
) -> Mapping[bytes, int]:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
The command is routed to all nodes, and aggregates the response to a single map of the channels and their number of subscriptions.
See https://valkey.io/commands/pubsub-shardnumsub for more details.
Args:
channels (Optional[List[TEncodable]]): The list of shard channels to query for the number of subscribers.
If not provided, returns an empty map.
Returns:
Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
Examples:
>>> await client.pubsub_shardnumsub(["channel1", "channel2"])
{b'channel1': 3, b'channel2': 5}
>>> await client.pubsub_shardnumsub()
{}
"""
return cast(
Mapping[bytes, int],
await self._execute_command(
RequestType.PubSubSNumSub, channels if channels else []
),
)

async def flushall(
self, flush_mode: Optional[FlushMode] = None, route: Optional[Route] = None
) -> TOK:
Expand Down
83 changes: 83 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6628,3 +6628,86 @@ async def lpos(
Union[int, List[int], None],
await self._execute_command(RequestType.LPos, args),
)

async def pubsub_channels(
self, pattern: Optional[TEncodable] = None
) -> List[bytes]:
"""
Lists the currently active channels.
The command is routed to all nodes, and aggregates the response to a single array.
See https://valkey.io/commands/pubsub-channels for more details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active channels.
If not provided, all active channels are returned.
Returns:
List[bytes]: A list of currently active channels matching the given pattern.
If no pattern is specified, all active channels are returned.
Examples:
>>> await client.pubsub_channels()
[b"channel1", b"channel2"]
>>> await client.pubsub_channels("news.*")
[b"news.sports", "news.weather"]
"""

return cast(
List[bytes],
await self._execute_command(
RequestType.PubSubChannels, [pattern] if pattern else []
),
)

async def pubsub_numpat(self) -> int:
"""
Returns the number of unique patterns that are subscribed to by clients.
Note: This is the total number of unique patterns all the clients are subscribed to,
not the count of clients subscribed to patterns.
The command is routed to all nodes, and aggregates the response the sum of all pattern subscriptions.
See https://valkey.io/commands/pubsub-numpat for more details.
Returns:
int: The number of unique patterns.
Examples:
>>> await client.pubsub_numpat()
3
"""
return cast(int, await self._execute_command(RequestType.PubSubNumPat, []))

async def pubsub_numsub(
self, channels: Optional[List[TEncodable]] = None
) -> Mapping[bytes, int]:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
The command is routed to all nodes, and aggregates the response to a single map of the channels and their number of subscriptions.
See https://valkey.io/commands/pubsub-numsub for more details.
Args:
channels (Optional[List[TEncodable]]): The list of channels to query for the number of subscribers.
If not provided, returns an empty map.
Returns:
Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers.
Examples:
>>> await client.pubsub_numsub(["channel1", "channel2"])
{b'channel1': 3, b'channel2': 5}
>>> await client.pubsub_numsub()
{}
"""
return cast(
Mapping[bytes, int],
await self._execute_command(
RequestType.PubSubNumSub, channels if channels else []
),
)
96 changes: 96 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4780,6 +4780,62 @@ def xclaim_just_id(

return self.append_command(RequestType.XClaim, args)

def pubsub_channels(
self: TTransaction, pattern: Optional[TEncodable] = None
) -> TTransaction:
"""
Lists the currently active channels.
See https://valkey.io/commands/pubsub-channels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active channels.
If not provided, all active channels are returned.
Command response:
List[bytes]: A list of currently active channels matching the given pattern.
If no pattern is specified, all active channels are returned.
"""

return self.append_command(
RequestType.PubSubChannels, [pattern] if pattern else []
)

def pubsub_numpat(self: TTransaction) -> TTransaction:
"""
Returns the number of unique patterns that are subscribed to by clients.
Note: This is the total number of unique patterns all the clients are subscribed to,
not the count of clients subscribed to patterns.
See https://valkey.io/commands/pubsub-numpat for details.
Command response:
int: The number of unique patterns.
"""
return self.append_command(RequestType.PubSubNumPat, [])

def pubsub_numsub(
self: TTransaction, channels: Optional[List[TEncodable]] = None
) -> TTransaction:
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-numsub for details.
Args:
channels (Optional[List[str]]): The list of channels to query for the number of subscribers.
If not provided, returns an empty map.
Command response:
Mapping[bytes, int]: A map where keys are the channel names and values are the number of subscribers.
"""
return self.append_command(
RequestType.PubSubNumSub, channels if channels else []
)


class Transaction(BaseTransaction):
"""
Expand Down Expand Up @@ -5172,4 +5228,44 @@ def publish(
RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
)

def pubsub_shardchannels(
self, pattern: Optional[TEncodable] = None
) -> "ClusterTransaction":
"""
Lists the currently active shard channels.
See https://valkey.io/commands/pubsub-shardchannels for details.
Args:
pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
If not provided, all active shard channels are returned.
Command response:
List[bytes]: A list of currently active shard channels matching the given pattern.
If no pattern is specified, all active shard channels are returned.
"""
command_args = [pattern] if pattern is not None else []
return self.append_command(RequestType.PubSubSChannels, command_args)

def pubsub_shardnumsub(
self, channels: Optional[List[TEncodable]] = None
) -> "ClusterTransaction":
"""
Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.
Note that it is valid to call this command without channels. In this case, it will just return an empty map.
See https://valkey.io/commands/pubsub-shardnumsub for details.
Args:
channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
If not provided, returns an empty map.
Command response:
Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
"""
return self.append_command(
RequestType.PubSubSNumSub, channels if channels else []
)

# TODO: add all CLUSTER commands
Loading

0 comments on commit b9e7cef

Please sign in to comment.