diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdce0d49d..e6342ddcf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ * Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680)) * Python: Added XGROUP SETID command ([#1683](https://github.com/aws/glide-for-redis/pull/1683)) * Python: Added FUNCTION LOAD command ([#1699](https://github.com/aws/glide-for-redis/pull/1699)) +* Python: Added XPENDING command ([#1704](https://github.com/aws/glide-for-redis/pull/1704)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index 7b0510dbb1..2e89d3cc1a 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -53,6 +53,7 @@ MinId, StreamAddOptions, StreamGroupOptions, + StreamPendingOptions, StreamRangeBound, StreamReadGroupOptions, StreamReadOptions, @@ -166,6 +167,7 @@ "MinId", "StreamAddOptions", "StreamGroupOptions", + "StreamPendingOptions", "StreamReadGroupOptions", "StreamRangeBound", "StreamReadOptions", diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index d88fd8352e..d27f7f5205 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -48,10 +48,12 @@ from glide.async_commands.stream import ( StreamAddOptions, StreamGroupOptions, + StreamPendingOptions, StreamRangeBound, StreamReadGroupOptions, StreamReadOptions, StreamTrimOptions, + _create_xpending_range_args, ) from glide.constants import TOK, TResult from glide.protobuf.redis_request_pb2 import RequestType @@ -3043,6 +3045,89 @@ async def xack( await self._execute_command(RequestType.XAck, [key, group_name] + ids), ) + async def xpending( + self, + key: str, + group_name: str, + ) -> List[Union[int, str, List[List[str]], None]]: + """ + Returns stream message summary information for pending messages for the given consumer group. + + See https://valkey.io/commands/xpending for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + + Returns: + List[Union[int, str, List[List[str]], None]]: A list that includes the summary of pending messages, with the + format `[num_group_messages, start_id, end_id, [[consumer_name, num_consumer_messages]]]`, where: + - `num_group_messages`: The total number of pending messages for this consumer group. + - `start_id`: The smallest ID among the pending messages. + - `end_id`: The greatest ID among the pending messages. + - `[[consumer_name, num_consumer_messages]]`: A 2D list of every consumer in the consumer group with at + least one pending message, and the number of pending messages it has. + + If there are no pending messages for the given consumer group, `[0, None, None, None]` will be returned. + + Examples: + >>> await client.xpending("my_stream", "my_group") + [4, "1-0", "1-3", [["my_consumer1", "3"], ["my_consumer2", "1"]] + """ + return cast( + List[Union[int, str, List[List[str]], None]], + await self._execute_command(RequestType.XPending, [key, group_name]), + ) + + async def xpending_range( + self, + key: str, + group_name: str, + start: StreamRangeBound, + end: StreamRangeBound, + count: int, + options: Optional[StreamPendingOptions] = None, + ) -> List[List[Union[str, int]]]: + """ + Returns an extended form of stream message information for pending messages matching a given range of IDs. + + See https://valkey.io/commands/xpending for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + start (StreamRangeBound): The starting stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MinId` to start with the minimum available ID. + end (StreamRangeBound): The ending stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MaxId` to end with the maximum available ID. + count (int): Limits the number of messages returned. + options (Optional[StreamPendingOptions]): The stream pending options. + + Returns: + List[List[Union[str, int]]]: A list of lists, where each inner list is a length 4 list containing extended + message information with the format `[[id, consumer_name, time_elapsed, num_delivered]]`, where: + - `id`: The ID of the message. + - `consumer_name`: The name of the consumer that fetched the message and has still to acknowledge it. We + call it the current owner of the message. + - `time_elapsed`: The number of milliseconds that elapsed since the last time this message was delivered + to this consumer. + - `num_delivered`: The number of times this message was delivered. + + Examples: + >>> await client.xpending_range("my_stream", "my_group", MinId(), MaxId(), 10, StreamPendingOptions(consumer_name="my_consumer")) + [["1-0", "my_consumer", 1234, 1], ["1-1", "my_consumer", 1123, 1]] + # Extended stream entry information for the pending entries associated with "my_consumer". + """ + args = _create_xpending_range_args(key, group_name, start, end, count, options) + return cast( + List[List[Union[str, int]]], + await self._execute_command(RequestType.XPending, args), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/stream.py b/python/python/glide/async_commands/stream.py index 5a4ea33042..8a25337066 100644 --- a/python/python/glide/async_commands/stream.py +++ b/python/python/glide/async_commands/stream.py @@ -135,8 +135,8 @@ def to_args(self) -> List[str]: class StreamRangeBound(ABC): """ - Abstract Base Class used in the `XRANGE` and `XREVRANGE` commands to specify the starting and ending range bound for - the stream search by stream ID. + Abstract Base Class used in the `XPENDING`, `XRANGE`, and `XREVRANGE` commands to specify the starting and ending + range bound for the stream search by stream entry ID. """ @abstractmethod @@ -206,6 +206,8 @@ class ExclusiveIdBound(StreamRangeBound): Exclusive (open) stream ID boundary used to specify a range of IDs to search. Stream ID bounds can be complete with a timestamp and sequence number separated by a dash ("-"), for example "1526985054069-0". Stream ID bounds can also be incomplete, with just a timestamp. + + Since: Redis version 6.2.0. """ EXCLUSIVE_BOUND_REDIS_API = "(" @@ -335,3 +337,42 @@ def to_args(self) -> List[str]: args.append(self.READ_NOACK_REDIS_API) return args + + +class StreamPendingOptions: + IDLE_TIME_REDIS_API = "IDLE" + + def __init__( + self, + min_idle_time_ms: Optional[int] = None, + consumer_name: Optional[str] = None, + ): + """ + Options for `XPENDING` that can be used to filter returned items by minimum idle time and consumer name. + + Args: + min_idle_time_ms (Optional[int]): Filters pending entries by their minimum idle time in milliseconds. This + option can only be specified if you are using Redis version 6.2.0 or above. + consumer_name (Optional[str]): Filters pending entries by consumer name. + """ + self.min_idle_time = min_idle_time_ms + self.consumer_name = consumer_name + + +def _create_xpending_range_args( + key: str, + group_name: str, + start: StreamRangeBound, + end: StreamRangeBound, + count: int, + options: Optional[StreamPendingOptions], +) -> List[str]: + args = [key, group_name] + if options is not None and options.min_idle_time is not None: + args.extend([options.IDLE_TIME_REDIS_API, str(options.min_idle_time)]) + + args.extend([start.to_arg(), end.to_arg(), str(count)]) + if options is not None and options.consumer_name is not None: + args.append(options.consumer_name) + + return args diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 5add3172d6..68c64ecf38 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -45,10 +45,12 @@ from glide.async_commands.stream import ( StreamAddOptions, StreamGroupOptions, + StreamPendingOptions, StreamRangeBound, StreamReadGroupOptions, StreamReadOptions, StreamTrimOptions, + _create_xpending_range_args, ) from glide.protobuf.redis_request_pb2 import RequestType @@ -2150,6 +2152,74 @@ def xack( """ return self.append_command(RequestType.XAck, [key, group_name] + ids) + def xpending( + self: TTransaction, + key: str, + group_name: str, + ) -> TTransaction: + """ + Returns stream message summary information for pending messages for the given consumer group. + + See https://valkey.io/commands/xpending for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + + Command response: + List[Union[int, str, List[List[str]], None]]: A list that includes the summary of pending messages, with the + format `[num_group_messages, start_id, end_id, [[consumer_name, num_consumer_messages]]]`, where: + - `num_group_messages`: The total number of pending messages for this consumer group. + - `start_id`: The smallest ID among the pending messages. + - `end_id`: The greatest ID among the pending messages. + - `[[consumer_name, num_consumer_messages]]`: A 2D list of every consumer in the consumer group with at + least one pending message, and the number of pending messages it has. + + If there are no pending messages for the given consumer group, `[0, None, None, None]` will be returned. + """ + return self.append_command(RequestType.XPending, [key, group_name]) + + def xpending_range( + self: TTransaction, + key: str, + group_name: str, + start: StreamRangeBound, + end: StreamRangeBound, + count: int, + options: Optional[StreamPendingOptions] = None, + ) -> TTransaction: + """ + Returns an extended form of stream message information for pending messages matching a given range of IDs. + + See https://valkey.io/commands/xpending for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + start (StreamRangeBound): The starting stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MinId` to start with the minimum available ID. + end (StreamRangeBound): The ending stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MaxId` to end with the maximum available ID. + count (int): Limits the number of messages returned. + options (Optional[StreamPendingOptions]): The stream pending options. + + Command response: + List[List[Union[str, int]]]: A list of lists, where each inner list is a length 4 list containing extended + message information with the format `[[id, consumer_name, time_elapsed, num_delivered]]`, where: + - `id`: The ID of the message. + - `consumer_name`: The name of the consumer that fetched the message and has still to acknowledge it. We + call it the current owner of the message. + - `time_elapsed`: The number of milliseconds that elapsed since the last time this message was delivered + to this consumer. + - `num_delivered`: The number of times this message was delivered. + """ + args = _create_xpending_range_args(key, group_name, start, end, count, options) + return self.append_command(RequestType.XPending, args) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index a85ead46f3..579d6eedc8 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -62,6 +62,7 @@ MinId, StreamAddOptions, StreamGroupOptions, + StreamPendingOptions, StreamReadGroupOptions, StreamReadOptions, TrimByMaxLen, @@ -5563,6 +5564,340 @@ async def test_xack( with pytest.raises(RequestError): await redis_client.xack(string_key, group_name, [stream_id1_0]) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xpending(self, redis_client: TGlideClient): + key = get_random_string(10) + group_name = get_random_string(10) + consumer1 = get_random_string(10) + consumer2 = get_random_string(10) + stream_id0 = "0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + stream_id1_2 = "1-2" + stream_id1_3 = "1-3" + stream_id1_4 = "1-4" + + # create group and consumer for group + assert ( + await redis_client.xgroup_create( + key, group_name, stream_id0, StreamGroupOptions(make_stream=True) + ) + == OK + ) + assert ( + await redis_client.xgroup_create_consumer(key, group_name, consumer1) + is True + ) + assert ( + await redis_client.xgroup_create_consumer(key, group_name, consumer2) + is True + ) + + # add two stream entries for consumer1 + assert ( + await redis_client.xadd( + key, [("f1_0", "v1_0")], StreamAddOptions(stream_id1_0) + ) + == stream_id1_0 + ) + assert ( + await redis_client.xadd( + key, [("f1_1", "v1_1")], StreamAddOptions(stream_id1_1) + ) + == stream_id1_1 + ) + + # read the entire stream with consumer1 and mark messages as pending + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer1) == { + key: { + stream_id1_0: [["f1_0", "v1_0"]], + stream_id1_1: [["f1_1", "v1_1"]], + } + } + + # add three stream entries for consumer2 + assert ( + await redis_client.xadd( + key, [("f1_2", "v1_2")], StreamAddOptions(stream_id1_2) + ) + == stream_id1_2 + ) + assert ( + await redis_client.xadd( + key, [("f1_3", "v1_3")], StreamAddOptions(stream_id1_3) + ) + == stream_id1_3 + ) + assert ( + await redis_client.xadd( + key, [("f1_4", "v1_4")], StreamAddOptions(stream_id1_4) + ) + == stream_id1_4 + ) + + # read the entire stream with consumer2 and mark messages as pending + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer2) == { + key: { + stream_id1_2: [["f1_2", "v1_2"]], + stream_id1_3: [["f1_3", "v1_3"]], + stream_id1_4: [["f1_4", "v1_4"]], + } + } + + # inner array order is non-deterministic, so we have to assert against it separately from the other info + result = await redis_client.xpending(key, group_name) + consumer_results = cast(List, result[3]) + assert [consumer1, "2"] in consumer_results + assert [consumer2, "3"] in consumer_results + + result.remove(consumer_results) + assert result == [5, stream_id1_0, stream_id1_4] + + range_result = await redis_client.xpending_range( + key, group_name, MinId(), MaxId(), 10 + ) + # the inner lists of the result have format [stream_entry_id, consumer, idle_time, times_delivered] + # because the idle time return value is not deterministic, we have to assert against it separately + idle_time = cast(int, range_result[0][2]) + assert idle_time > 0 + range_result[0].remove(idle_time) + assert range_result[0] == [stream_id1_0, consumer1, 1] + + idle_time = cast(int, range_result[1][2]) + assert idle_time > 0 + range_result[1].remove(idle_time) + assert range_result[1] == [stream_id1_1, consumer1, 1] + + idle_time = cast(int, range_result[2][2]) + assert idle_time > 0 + range_result[2].remove(idle_time) + assert range_result[2] == [stream_id1_2, consumer2, 1] + + idle_time = cast(int, range_result[3][2]) + assert idle_time > 0 + range_result[3].remove(idle_time) + assert range_result[3] == [stream_id1_3, consumer2, 1] + + idle_time = cast(int, range_result[4][2]) + assert idle_time > 0 + range_result[4].remove(idle_time) + assert range_result[4] == [stream_id1_4, consumer2, 1] + + # acknowledge streams 1-1 to 1-3 and remove them from the xpending results + assert ( + await redis_client.xack( + key, group_name, [stream_id1_1, stream_id1_2, stream_id1_3] + ) + == 3 + ) + + range_result = await redis_client.xpending_range( + key, group_name, IdBound(stream_id1_4), MaxId(), 10 + ) + assert len(range_result) == 1 + assert range_result[0][0] == stream_id1_4 + assert range_result[0][1] == consumer2 + + range_result = await redis_client.xpending_range( + key, group_name, MinId(), IdBound(stream_id1_3), 10 + ) + assert len(range_result) == 1 + assert range_result[0][0] == stream_id1_0 + assert range_result[0][1] == consumer1 + + # passing an empty StreamPendingOptions object should have no effect + range_result = await redis_client.xpending_range( + key, group_name, MinId(), IdBound(stream_id1_3), 10, StreamPendingOptions() + ) + assert len(range_result) == 1 + assert range_result[0][0] == stream_id1_0 + assert range_result[0][1] == consumer1 + + range_result = await redis_client.xpending_range( + key, + group_name, + MinId(), + MaxId(), + 10, + StreamPendingOptions(min_idle_time_ms=1, consumer_name=consumer2), + ) + assert len(range_result) == 1 + assert range_result[0][0] == stream_id1_4 + assert range_result[0][1] == consumer2 + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xpending_edge_cases_and_failures(self, redis_client: TGlideClient): + key = get_random_string(10) + non_existing_key = get_random_string(10) + string_key = get_random_string(10) + group_name = get_random_string(10) + consumer = get_random_string(10) + stream_id0 = "0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + + # create group and consumer for the group + assert ( + await redis_client.xgroup_create( + key, group_name, stream_id0, StreamGroupOptions(make_stream=True) + ) + == OK + ) + assert ( + await redis_client.xgroup_create_consumer(key, group_name, consumer) is True + ) + + # add two stream entries for consumer + assert ( + await redis_client.xadd( + key, [("f1_0", "v1_0")], StreamAddOptions(stream_id1_0) + ) + == stream_id1_0 + ) + assert ( + await redis_client.xadd( + key, [("f1_1", "v1_1")], StreamAddOptions(stream_id1_1) + ) + == stream_id1_1 + ) + + # no pending messages yet... + assert await redis_client.xpending(key, group_name) == [0, None, None, None] + assert ( + await redis_client.xpending_range(key, group_name, MinId(), MaxId(), 10) + == [] + ) + + # read the entire stream with consumer and mark messages as pending + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer) == { + key: { + stream_id1_0: [["f1_0", "v1_0"]], + stream_id1_1: [["f1_1", "v1_1"]], + } + } + + # sanity check - expect some results + assert await redis_client.xpending(key, group_name) == [ + 2, + stream_id1_0, + stream_id1_1, + [[consumer, "2"]], + ] + result = await redis_client.xpending_range( + key, group_name, MinId(), MaxId(), 10 + ) + assert len(result[0]) > 0 + + # returns empty if + before - + assert ( + await redis_client.xpending_range(key, group_name, MaxId(), MinId(), 10) + == [] + ) + assert ( + await redis_client.xpending_range( + key, + group_name, + MaxId(), + MinId(), + 10, + StreamPendingOptions(consumer_name=consumer), + ) + == [] + ) + + # min idle time of 100 seconds shouldn't produce any results + assert ( + await redis_client.xpending_range( + key, + group_name, + MinId(), + MaxId(), + 10, + StreamPendingOptions(min_idle_time_ms=100_000), + ) + == [] + ) + + # non-existing consumer: no results + assert ( + await redis_client.xpending_range( + key, + group_name, + MinId(), + MaxId(), + 10, + StreamPendingOptions(consumer_name="non_existing_consumer"), + ) + == [] + ) + + # xpending when range bound is not a valid ID raises a RequestError + with pytest.raises(RequestError): + await redis_client.xpending_range( + key, group_name, IdBound("invalid_stream_id_format"), MaxId(), 10 + ) + with pytest.raises(RequestError): + await redis_client.xpending_range( + key, group_name, MinId(), IdBound("invalid_stream_id_format"), 10 + ) + + # non-positive count returns no results + assert ( + await redis_client.xpending_range(key, group_name, MinId(), MaxId(), -10) + == [] + ) + assert ( + await redis_client.xpending_range(key, group_name, MinId(), MaxId(), 0) + == [] + ) + + # non-positive min-idle-time values are allowed + result = await redis_client.xpending_range( + key, + group_name, + MinId(), + MaxId(), + 10, + StreamPendingOptions(min_idle_time_ms=-100), + ) + assert len(result[0]) > 0 + result = await redis_client.xpending_range( + key, + group_name, + MinId(), + MaxId(), + 10, + StreamPendingOptions(min_idle_time_ms=0), + ) + assert len(result[0]) > 0 + + # non-existing group name raises a RequestError (NOGROUP) + with pytest.raises(RequestError): + await redis_client.xpending(key, "non_existing_group") + with pytest.raises(RequestError): + await redis_client.xpending_range( + key, "non_existing_group", MinId(), MaxId(), 10 + ) + + # non-existing key raises a RequestError + with pytest.raises(RequestError): + await redis_client.xpending(non_existing_key, group_name) + with pytest.raises(RequestError): + await redis_client.xpending_range( + non_existing_key, group_name, MinId(), MaxId(), 10 + ) + + # key exists but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xpending(string_key, group_name) + with pytest.raises(RequestError): + await redis_client.xpending_range( + string_key, group_name, MinId(), MaxId(), 10 + ) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_xgroup_set_id( diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 15638826a3..5c2e141f72 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -39,6 +39,8 @@ ) from glide.async_commands.stream import ( IdBound, + MaxId, + MinId, StreamAddOptions, StreamGroupOptions, StreamReadGroupOptions, @@ -517,8 +519,12 @@ async def transaction_test( {key11: ">"}, group_name1, consumer, StreamReadGroupOptions(count=5) ) args.append({key11: {"0-2": [["foo", "bar"]]}}) + transaction.xpending(key11, group_name1) + args.append([1, "0-2", "0-2", [[consumer, "1"]]]) transaction.xack(key11, group_name1, ["0-2"]) args.append(1) + transaction.xpending_range(key11, group_name1, MinId(), MaxId(), 1) + args.append([]) transaction.xgroup_set_id(key11, group_name1, "0-2") args.append(OK) transaction.xgroup_del_consumer(key11, group_name1, consumer)