Skip to content

Commit

Permalink
Python: add XREVRANGE command (#1625)
Browse files Browse the repository at this point in the history
* Python: add XREVRANGE command

* Update doc for xrevrange

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

* Update transaction docs

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

---------

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
Co-authored-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
aaron-congo and acarbonetto authored Jun 22, 2024
1 parent 2438d8b commit c921f13
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* Python: Added XDEL command ([#1619](https://github.com/aws/glide-for-redis/pull/1619))
* Python: Added XRANGE command ([#1624](https://github.com/aws/glide-for-redis/pull/1624))
* Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626))
* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
52 changes: 51 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2683,7 +2683,8 @@ async def xrange(
Returns:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`.
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range
arguments are not applicable.
Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1"))
Expand All @@ -2703,6 +2704,55 @@ async def xrange(
await self._execute_command(RequestType.XRange, args),
)

async def xrevrange(
self,
key: str,
end: StreamRangeBound,
start: StreamRangeBound,
count: Optional[int] = None,
) -> Optional[Mapping[str, List[List[str]]]]:
"""
Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the
entries in reverse order.
See https://valkey.io/commands/xrevrange for more details.
Args:
key (str): The key of the stream.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
count (Optional[int]): An optional argument specifying the maximum count of stream entries to return.
If `count` is not provided, all stream entries in the range will be returned.
Returns:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range
arguments are not applicable.
Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1"))
>>> await client.xadd("mystream", [("field2", "value2"), ("field2", "value3")], StreamAddOptions(id="0-2"))
>>> await client.xrevrange("mystream", MaxId(), MinId())
{
"0-2": [["field2", "value2"], ["field2", "value3"]],
"0-1": [["field1", "value1"]],
} # Indicates the stream IDs and their associated field-value pairs for all stream entries in "mystream".
"""
args = [key, end.to_arg(), start.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return cast(
Optional[Mapping[str, List[List[str]]]],
await self._execute_command(RequestType.XRevRange, args),
)

async def geoadd(
self,
key: str,
Expand Down
40 changes: 39 additions & 1 deletion python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1879,14 +1879,52 @@ def xrange(
Command response:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`.
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments
are not applicable.
"""
args = [key, start.to_arg(), end.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XRange, args)

def xrevrange(
self: TTransaction,
key: str,
end: StreamRangeBound,
start: StreamRangeBound,
count: Optional[int] = None,
) -> TTransaction:
"""
Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the
entries in reverse order.
See https://valkey.io/commands/xrevrange for more details.
Args:
key (str): The key of the stream.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
count (Optional[int]): An optional argument specifying the maximum count of stream entries to return.
If `count` is not provided, all stream entries in the range will be returned.
Command response:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments
are not applicable.
"""
args = [key, end.to_arg(), start.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XRevRange, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
24 changes: 22 additions & 2 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4811,7 +4811,7 @@ async def test_xdel(self, redis_client: TRedisClient):

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xrange(self, redis_client: TRedisClient):
async def test_xrange_and_xrevrange(self, redis_client: TRedisClient):
key = get_random_string(10)
non_existing_key = get_random_string(10)
string_key = get_random_string(10)
Expand All @@ -4838,43 +4838,63 @@ async def test_xrange(self, redis_client: TRedisClient):
stream_id1: [["f1", "v1"]],
stream_id2: [["f2", "v2"]],
}
assert await redis_client.xrevrange(key, MaxId(), MinId()) == {
stream_id2: [["f2", "v2"]],
stream_id1: [["f1", "v1"]],
}

# returns empty mapping if + before -
assert await redis_client.xrange(key, MaxId(), MinId()) == {}
# rev search returns empty mapping if - before +
assert await redis_client.xrevrange(key, MinId(), MaxId()) == {}

assert (
await redis_client.xadd(
key, [("f3", "v3")], StreamAddOptions(id=stream_id3)
)
== stream_id3
)

# get the newest entry
assert await redis_client.xrange(
key, ExclusiveIdBound(stream_id2), ExclusiveIdBound.from_timestamp(5), 1
) == {stream_id3: [["f3", "v3"]]}
assert await redis_client.xrevrange(
key, ExclusiveIdBound.from_timestamp(5), ExclusiveIdBound(stream_id2), 1
) == {stream_id3: [["f3", "v3"]]}

# xrange against an emptied stream
# xrange/xrevrange against an emptied stream
assert await redis_client.xdel(key, [stream_id1, stream_id2, stream_id3]) == 3
assert await redis_client.xrange(key, MinId(), MaxId(), 10) == {}
assert await redis_client.xrevrange(key, MaxId(), MinId(), 10) == {}

assert await redis_client.xrange(non_existing_key, MinId(), MaxId()) == {}
assert await redis_client.xrevrange(non_existing_key, MaxId(), MinId()) == {}

# count value < 1 returns None
assert await redis_client.xrange(key, MinId(), MaxId(), 0) is None
assert await redis_client.xrange(key, MinId(), MaxId(), -1) is None
assert await redis_client.xrevrange(key, MaxId(), MinId(), 0) is None
assert await redis_client.xrevrange(key, MaxId(), MinId(), -1) is None

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo")
with pytest.raises(RequestError):
await redis_client.xrange(string_key, MinId(), MaxId())
with pytest.raises(RequestError):
await redis_client.xrevrange(string_key, MaxId(), MinId())

# invalid start bound
with pytest.raises(RequestError):
await redis_client.xrange(key, IdBound("not_a_stream_id"), MaxId())
with pytest.raises(RequestError):
await redis_client.xrevrange(key, MaxId(), IdBound("not_a_stream_id"))

# invalid end bound
with pytest.raises(RequestError):
await redis_client.xrange(key, MinId(), IdBound("not_a_stream_id"))
with pytest.raises(RequestError):
await redis_client.xrevrange(key, IdBound("not_a_stream_id"), MinId())

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ async def transaction_test(
args.append(2)
transaction.xrange(key11, IdBound("0-1"), IdBound("0-1"))
args.append({"0-1": [["foo", "bar"]]})
transaction.xrevrange(key11, IdBound("0-1"), IdBound("0-1"))
args.append({"0-1": [["foo", "bar"]]})
transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True))
args.append(1)
transaction.xdel(key11, ["0-2", "0-3"])
Expand Down

0 comments on commit c921f13

Please sign in to comment.