From e46f9e3a61e3a450004152f2e97a30ee68bbabf1 Mon Sep 17 00:00:00 2001 From: Aaron <69273634+aaron-congo@users.noreply.github.com> Date: Fri, 21 Jun 2024 21:47:01 -0700 Subject: [PATCH] Python: add XREVRANGE command (#1625) * Python: add XREVRANGE command * Update doc for xrevrange Signed-off-by: Andrew Carbonetto * Update transaction docs Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto Co-authored-by: Andrew Carbonetto --- CHANGELOG.md | 1 + python/python/glide/async_commands/core.py | 52 ++++++++++++++++++- .../glide/async_commands/transaction.py | 40 +++++++++++++- python/python/tests/test_async_client.py | 24 ++++++++- python/python/tests/test_transaction.py | 2 + 5 files changed, 115 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 807b09c548..d1d018d210 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * Python: Added XDEL command ([#1619](https://github.com/aws/glide-for-redis/pull/1619)) * Python: Added XRANGE command ([#1624](https://github.com/aws/glide-for-redis/pull/1624)) * Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626)) +* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index d38b8a185d..c131127d6e 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2683,7 +2683,8 @@ async def xrange( Returns: Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a - list of pairings with format `[[field, entry], [field, entry], ...]`. + list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range + arguments are not applicable. Examples: >>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1")) @@ -2703,6 +2704,55 @@ async def xrange( await self._execute_command(RequestType.XRange, args), ) + async def xrevrange( + self, + key: str, + end: StreamRangeBound, + start: StreamRangeBound, + count: Optional[int] = None, + ) -> Optional[Mapping[str, List[List[str]]]]: + """ + Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the + entries in reverse order. + + See https://valkey.io/commands/xrevrange for more details. + + Args: + key (str): The key of the stream. + end (StreamRangeBound): The ending stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MaxId` to end with the maximum available ID. + start (StreamRangeBound): The starting stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MinId` to start with the minimum available ID. + count (Optional[int]): An optional argument specifying the maximum count of stream entries to return. + If `count` is not provided, all stream entries in the range will be returned. + + Returns: + Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a + list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range + arguments are not applicable. + + Examples: + >>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1")) + >>> await client.xadd("mystream", [("field2", "value2"), ("field2", "value3")], StreamAddOptions(id="0-2")) + >>> await client.xrevrange("mystream", MaxId(), MinId()) + { + "0-2": [["field2", "value2"], ["field2", "value3"]], + "0-1": [["field1", "value1"]], + } # Indicates the stream IDs and their associated field-value pairs for all stream entries in "mystream". + """ + args = [key, end.to_arg(), start.to_arg()] + if count is not None: + args.extend(["COUNT", str(count)]) + + return cast( + Optional[Mapping[str, List[List[str]]]], + await self._execute_command(RequestType.XRevRange, args), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index adced407f8..ba8e685f37 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -1879,7 +1879,8 @@ def xrange( Command response: Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a - list of pairings with format `[[field, entry], [field, entry], ...]`. + list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments + are not applicable. """ args = [key, start.to_arg(), end.to_arg()] if count is not None: @@ -1887,6 +1888,43 @@ def xrange( return self.append_command(RequestType.XRange, args) + def xrevrange( + self: TTransaction, + key: str, + end: StreamRangeBound, + start: StreamRangeBound, + count: Optional[int] = None, + ) -> TTransaction: + """ + Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the + entries in reverse order. + + See https://valkey.io/commands/xrevrange for more details. + + Args: + key (str): The key of the stream. + end (StreamRangeBound): The ending stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MaxId` to end with the maximum available ID. + start (StreamRangeBound): The starting stream ID bound for the range. + - Use `IdBound` to specify a stream ID. + - Use `ExclusiveIdBound` to specify an exclusive bounded stream ID. + - Use `MinId` to start with the minimum available ID. + count (Optional[int]): An optional argument specifying the maximum count of stream entries to return. + If `count` is not provided, all stream entries in the range will be returned. + + Command response: + Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a + list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments + are not applicable. + """ + args = [key, end.to_arg(), start.to_arg()] + if count is not None: + args.extend(["COUNT", str(count)]) + + return self.append_command(RequestType.XRevRange, args) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index cdaa33f8b7..20be6a28f6 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -4811,7 +4811,7 @@ async def test_xdel(self, redis_client: TRedisClient): @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) - async def test_xrange(self, redis_client: TRedisClient): + async def test_xrange_and_xrevrange(self, redis_client: TRedisClient): key = get_random_string(10) non_existing_key = get_random_string(10) string_key = get_random_string(10) @@ -4838,9 +4838,15 @@ async def test_xrange(self, redis_client: TRedisClient): stream_id1: [["f1", "v1"]], stream_id2: [["f2", "v2"]], } + assert await redis_client.xrevrange(key, MaxId(), MinId()) == { + stream_id2: [["f2", "v2"]], + stream_id1: [["f1", "v1"]], + } # returns empty mapping if + before - assert await redis_client.xrange(key, MaxId(), MinId()) == {} + # rev search returns empty mapping if - before + + assert await redis_client.xrevrange(key, MinId(), MaxId()) == {} assert ( await redis_client.xadd( @@ -4848,33 +4854,47 @@ async def test_xrange(self, redis_client: TRedisClient): ) == stream_id3 ) + # get the newest entry assert await redis_client.xrange( key, ExclusiveIdBound(stream_id2), ExclusiveIdBound.from_timestamp(5), 1 ) == {stream_id3: [["f3", "v3"]]} + assert await redis_client.xrevrange( + key, ExclusiveIdBound.from_timestamp(5), ExclusiveIdBound(stream_id2), 1 + ) == {stream_id3: [["f3", "v3"]]} - # xrange against an emptied stream + # xrange/xrevrange against an emptied stream assert await redis_client.xdel(key, [stream_id1, stream_id2, stream_id3]) == 3 assert await redis_client.xrange(key, MinId(), MaxId(), 10) == {} + assert await redis_client.xrevrange(key, MaxId(), MinId(), 10) == {} assert await redis_client.xrange(non_existing_key, MinId(), MaxId()) == {} + assert await redis_client.xrevrange(non_existing_key, MaxId(), MinId()) == {} # count value < 1 returns None assert await redis_client.xrange(key, MinId(), MaxId(), 0) is None assert await redis_client.xrange(key, MinId(), MaxId(), -1) is None + assert await redis_client.xrevrange(key, MaxId(), MinId(), 0) is None + assert await redis_client.xrevrange(key, MaxId(), MinId(), -1) is None # key exists, but it is not a stream assert await redis_client.set(string_key, "foo") with pytest.raises(RequestError): await redis_client.xrange(string_key, MinId(), MaxId()) + with pytest.raises(RequestError): + await redis_client.xrevrange(string_key, MaxId(), MinId()) # invalid start bound with pytest.raises(RequestError): await redis_client.xrange(key, IdBound("not_a_stream_id"), MaxId()) + with pytest.raises(RequestError): + await redis_client.xrevrange(key, MaxId(), IdBound("not_a_stream_id")) # invalid end bound with pytest.raises(RequestError): await redis_client.xrange(key, MinId(), IdBound("not_a_stream_id")) + with pytest.raises(RequestError): + await redis_client.xrevrange(key, IdBound("not_a_stream_id"), MinId()) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index e4b86b1e83..9f5acb64bd 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -476,6 +476,8 @@ async def transaction_test( args.append(2) transaction.xrange(key11, IdBound("0-1"), IdBound("0-1")) args.append({"0-1": [["foo", "bar"]]}) + transaction.xrevrange(key11, IdBound("0-1"), IdBound("0-1")) + args.append({"0-1": [["foo", "bar"]]}) transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True)) args.append(1) transaction.xdel(key11, ["0-2", "0-3"])