Skip to content

Commit

Permalink
Python: add XPENDING command (#1704)
Browse files Browse the repository at this point in the history
* Python: add XPENDING command

* PR suggestions

* PR suggestions
  • Loading branch information
aaron-congo authored Jun 28, 2024
1 parent 187b2ba commit b231a72
Show file tree
Hide file tree
Showing 7 changed files with 542 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680))
* Python: Added XGROUP SETID command ([#1683](https://github.com/aws/glide-for-redis/pull/1683))
* Python: Added FUNCTION LOAD command ([#1699](https://github.com/aws/glide-for-redis/pull/1699))
* Python: Added XPENDING command ([#1704](https://github.com/aws/glide-for-redis/pull/1704))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
MinId,
StreamAddOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
Expand Down Expand Up @@ -166,6 +167,7 @@
"MinId",
"StreamAddOptions",
"StreamGroupOptions",
"StreamPendingOptions",
"StreamReadGroupOptions",
"StreamRangeBound",
"StreamReadOptions",
Expand Down
85 changes: 85 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
_create_xpending_range_args,
)
from glide.constants import TOK, TResult
from glide.protobuf.redis_request_pb2 import RequestType
Expand Down Expand Up @@ -3043,6 +3045,89 @@ async def xack(
await self._execute_command(RequestType.XAck, [key, group_name] + ids),
)

async def xpending(
self,
key: str,
group_name: str,
) -> List[Union[int, str, List[List[str]], None]]:
"""
Returns stream message summary information for pending messages for the given consumer group.
See https://valkey.io/commands/xpending for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
Returns:
List[Union[int, str, List[List[str]], None]]: A list that includes the summary of pending messages, with the
format `[num_group_messages, start_id, end_id, [[consumer_name, num_consumer_messages]]]`, where:
- `num_group_messages`: The total number of pending messages for this consumer group.
- `start_id`: The smallest ID among the pending messages.
- `end_id`: The greatest ID among the pending messages.
- `[[consumer_name, num_consumer_messages]]`: A 2D list of every consumer in the consumer group with at
least one pending message, and the number of pending messages it has.
If there are no pending messages for the given consumer group, `[0, None, None, None]` will be returned.
Examples:
>>> await client.xpending("my_stream", "my_group")
[4, "1-0", "1-3", [["my_consumer1", "3"], ["my_consumer2", "1"]]
"""
return cast(
List[Union[int, str, List[List[str]], None]],
await self._execute_command(RequestType.XPending, [key, group_name]),
)

async def xpending_range(
self,
key: str,
group_name: str,
start: StreamRangeBound,
end: StreamRangeBound,
count: int,
options: Optional[StreamPendingOptions] = None,
) -> List[List[Union[str, int]]]:
"""
Returns an extended form of stream message information for pending messages matching a given range of IDs.
See https://valkey.io/commands/xpending for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
count (int): Limits the number of messages returned.
options (Optional[StreamPendingOptions]): The stream pending options.
Returns:
List[List[Union[str, int]]]: A list of lists, where each inner list is a length 4 list containing extended
message information with the format `[[id, consumer_name, time_elapsed, num_delivered]]`, where:
- `id`: The ID of the message.
- `consumer_name`: The name of the consumer that fetched the message and has still to acknowledge it. We
call it the current owner of the message.
- `time_elapsed`: The number of milliseconds that elapsed since the last time this message was delivered
to this consumer.
- `num_delivered`: The number of times this message was delivered.
Examples:
>>> await client.xpending_range("my_stream", "my_group", MinId(), MaxId(), 10, StreamPendingOptions(consumer_name="my_consumer"))
[["1-0", "my_consumer", 1234, 1], ["1-1", "my_consumer", 1123, 1]]
# Extended stream entry information for the pending entries associated with "my_consumer".
"""
args = _create_xpending_range_args(key, group_name, start, end, count, options)
return cast(
List[List[Union[str, int]]],
await self._execute_command(RequestType.XPending, args),
)

async def geoadd(
self,
key: str,
Expand Down
45 changes: 43 additions & 2 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def to_args(self) -> List[str]:

class StreamRangeBound(ABC):
"""
Abstract Base Class used in the `XRANGE` and `XREVRANGE` commands to specify the starting and ending range bound for
the stream search by stream ID.
Abstract Base Class used in the `XPENDING`, `XRANGE`, and `XREVRANGE` commands to specify the starting and ending
range bound for the stream search by stream entry ID.
"""

@abstractmethod
Expand Down Expand Up @@ -206,6 +206,8 @@ class ExclusiveIdBound(StreamRangeBound):
Exclusive (open) stream ID boundary used to specify a range of IDs to search. Stream ID bounds can be complete with
a timestamp and sequence number separated by a dash ("-"), for example "1526985054069-0". Stream ID bounds can also
be incomplete, with just a timestamp.
Since: Redis version 6.2.0.
"""

EXCLUSIVE_BOUND_REDIS_API = "("
Expand Down Expand Up @@ -335,3 +337,42 @@ def to_args(self) -> List[str]:
args.append(self.READ_NOACK_REDIS_API)

return args


class StreamPendingOptions:
IDLE_TIME_REDIS_API = "IDLE"

def __init__(
self,
min_idle_time_ms: Optional[int] = None,
consumer_name: Optional[str] = None,
):
"""
Options for `XPENDING` that can be used to filter returned items by minimum idle time and consumer name.
Args:
min_idle_time_ms (Optional[int]): Filters pending entries by their minimum idle time in milliseconds. This
option can only be specified if you are using Redis version 6.2.0 or above.
consumer_name (Optional[str]): Filters pending entries by consumer name.
"""
self.min_idle_time = min_idle_time_ms
self.consumer_name = consumer_name


def _create_xpending_range_args(
key: str,
group_name: str,
start: StreamRangeBound,
end: StreamRangeBound,
count: int,
options: Optional[StreamPendingOptions],
) -> List[str]:
args = [key, group_name]
if options is not None and options.min_idle_time is not None:
args.extend([options.IDLE_TIME_REDIS_API, str(options.min_idle_time)])

args.extend([start.to_arg(), end.to_arg(), str(count)])
if options is not None and options.consumer_name is not None:
args.append(options.consumer_name)

return args
70 changes: 70 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
from glide.async_commands.stream import (
StreamAddOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
_create_xpending_range_args,
)
from glide.protobuf.redis_request_pb2 import RequestType

Expand Down Expand Up @@ -2150,6 +2152,74 @@ def xack(
"""
return self.append_command(RequestType.XAck, [key, group_name] + ids)

def xpending(
self: TTransaction,
key: str,
group_name: str,
) -> TTransaction:
"""
Returns stream message summary information for pending messages for the given consumer group.
See https://valkey.io/commands/xpending for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
Command response:
List[Union[int, str, List[List[str]], None]]: A list that includes the summary of pending messages, with the
format `[num_group_messages, start_id, end_id, [[consumer_name, num_consumer_messages]]]`, where:
- `num_group_messages`: The total number of pending messages for this consumer group.
- `start_id`: The smallest ID among the pending messages.
- `end_id`: The greatest ID among the pending messages.
- `[[consumer_name, num_consumer_messages]]`: A 2D list of every consumer in the consumer group with at
least one pending message, and the number of pending messages it has.
If there are no pending messages for the given consumer group, `[0, None, None, None]` will be returned.
"""
return self.append_command(RequestType.XPending, [key, group_name])

def xpending_range(
self: TTransaction,
key: str,
group_name: str,
start: StreamRangeBound,
end: StreamRangeBound,
count: int,
options: Optional[StreamPendingOptions] = None,
) -> TTransaction:
"""
Returns an extended form of stream message information for pending messages matching a given range of IDs.
See https://valkey.io/commands/xpending for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
count (int): Limits the number of messages returned.
options (Optional[StreamPendingOptions]): The stream pending options.
Command response:
List[List[Union[str, int]]]: A list of lists, where each inner list is a length 4 list containing extended
message information with the format `[[id, consumer_name, time_elapsed, num_delivered]]`, where:
- `id`: The ID of the message.
- `consumer_name`: The name of the consumer that fetched the message and has still to acknowledge it. We
call it the current owner of the message.
- `time_elapsed`: The number of milliseconds that elapsed since the last time this message was delivered
to this consumer.
- `num_delivered`: The number of times this message was delivered.
"""
args = _create_xpending_range_args(key, group_name, start, end, count, options)
return self.append_command(RequestType.XPending, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
Loading

0 comments on commit b231a72

Please sign in to comment.