Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XREVRANGE command #1625

Merged
merged 4 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* Python: Added XDEL command ([#1619](https://github.com/aws/glide-for-redis/pull/1619))
* Python: Added XRANGE command ([#1624](https://github.com/aws/glide-for-redis/pull/1624))
* Python: Added COPY command ([#1626](https://github.com/aws/glide-for-redis/pull/1626))
* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
52 changes: 51 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2683,7 +2683,8 @@ async def xrange(

Returns:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`.
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range
arguments are not applicable.

Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1"))
Expand All @@ -2703,6 +2704,55 @@ async def xrange(
await self._execute_command(RequestType.XRange, args),
)

async def xrevrange(
self,
key: str,
end: StreamRangeBound,
start: StreamRangeBound,
count: Optional[int] = None,
) -> Optional[Mapping[str, List[List[str]]]]:
"""
Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the
entries in reverse order.

See https://valkey.io/commands/xrevrange for more details.

Args:
key (str): The key of the stream.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
count (Optional[int]): An optional argument specifying the maximum count of stream entries to return.
If `count` is not provided, all stream entries in the range will be returned.

Returns:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range
arguments are not applicable.

Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="0-1"))
>>> await client.xadd("mystream", [("field2", "value2"), ("field2", "value3")], StreamAddOptions(id="0-2"))
>>> await client.xrevrange("mystream", MaxId(), MinId())
{
"0-2": [["field2", "value2"], ["field2", "value3"]],
"0-1": [["field1", "value1"]],
} # Indicates the stream IDs and their associated field-value pairs for all stream entries in "mystream".
"""
args = [key, end.to_arg(), start.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return cast(
Optional[Mapping[str, List[List[str]]]],
await self._execute_command(RequestType.XRevRange, args),
)

async def geoadd(
self,
key: str,
Expand Down
40 changes: 39 additions & 1 deletion python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1879,14 +1879,52 @@ def xrange(

Command response:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`.
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments
are not applicable.
"""
args = [key, start.to_arg(), end.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XRange, args)

def xrevrange(
self: TTransaction,
key: str,
end: StreamRangeBound,
start: StreamRangeBound,
count: Optional[int] = None,
) -> TTransaction:
"""
Returns stream entries matching a given range of IDs in reverse order. Equivalent to `XRANGE` but returns the
entries in reverse order.

See https://valkey.io/commands/xrevrange for more details.

Args:
key (str): The key of the stream.
end (StreamRangeBound): The ending stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MaxId` to end with the maximum available ID.
start (StreamRangeBound): The starting stream ID bound for the range.
- Use `IdBound` to specify a stream ID.
- Use `ExclusiveIdBound` to specify an exclusive bounded stream ID.
- Use `MinId` to start with the minimum available ID.
count (Optional[int]): An optional argument specifying the maximum count of stream entries to return.
If `count` is not provided, all stream entries in the range will be returned.

Command response:
Optional[Mapping[str, List[List[str]]]]: A mapping of stream IDs to stream entry data, where entry data is a
list of pairings with format `[[field, entry], [field, entry], ...]`. Returns null if the range arguments
are not applicable.
"""
args = [key, end.to_arg(), start.to_arg()]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XRevRange, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
24 changes: 22 additions & 2 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4811,7 +4811,7 @@ async def test_xdel(self, redis_client: TRedisClient):

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xrange(self, redis_client: TRedisClient):
async def test_xrange_and_xrevrange(self, redis_client: TRedisClient):
key = get_random_string(10)
non_existing_key = get_random_string(10)
string_key = get_random_string(10)
Expand All @@ -4838,43 +4838,63 @@ async def test_xrange(self, redis_client: TRedisClient):
stream_id1: [["f1", "v1"]],
stream_id2: [["f2", "v2"]],
}
assert await redis_client.xrevrange(key, MaxId(), MinId()) == {
stream_id2: [["f2", "v2"]],
stream_id1: [["f1", "v1"]],
}

# returns empty mapping if + before -
assert await redis_client.xrange(key, MaxId(), MinId()) == {}
# rev search returns empty mapping if - before +
assert await redis_client.xrevrange(key, MinId(), MaxId()) == {}

assert (
await redis_client.xadd(
key, [("f3", "v3")], StreamAddOptions(id=stream_id3)
)
== stream_id3
)

# get the newest entry
assert await redis_client.xrange(
key, ExclusiveIdBound(stream_id2), ExclusiveIdBound.from_timestamp(5), 1
) == {stream_id3: [["f3", "v3"]]}
assert await redis_client.xrevrange(
key, ExclusiveIdBound.from_timestamp(5), ExclusiveIdBound(stream_id2), 1
) == {stream_id3: [["f3", "v3"]]}

# xrange against an emptied stream
# xrange/xrevrange against an emptied stream
assert await redis_client.xdel(key, [stream_id1, stream_id2, stream_id3]) == 3
assert await redis_client.xrange(key, MinId(), MaxId(), 10) == {}
assert await redis_client.xrevrange(key, MaxId(), MinId(), 10) == {}

assert await redis_client.xrange(non_existing_key, MinId(), MaxId()) == {}
assert await redis_client.xrevrange(non_existing_key, MaxId(), MinId()) == {}

# count value < 1 returns None
assert await redis_client.xrange(key, MinId(), MaxId(), 0) is None
assert await redis_client.xrange(key, MinId(), MaxId(), -1) is None
assert await redis_client.xrevrange(key, MaxId(), MinId(), 0) is None
assert await redis_client.xrevrange(key, MaxId(), MinId(), -1) is None

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo")
with pytest.raises(RequestError):
await redis_client.xrange(string_key, MinId(), MaxId())
with pytest.raises(RequestError):
await redis_client.xrevrange(string_key, MaxId(), MinId())

# invalid start bound
with pytest.raises(RequestError):
await redis_client.xrange(key, IdBound("not_a_stream_id"), MaxId())
with pytest.raises(RequestError):
await redis_client.xrevrange(key, MaxId(), IdBound("not_a_stream_id"))

# invalid end bound
with pytest.raises(RequestError):
await redis_client.xrange(key, MinId(), IdBound("not_a_stream_id"))
with pytest.raises(RequestError):
await redis_client.xrevrange(key, IdBound("not_a_stream_id"), MinId())

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ async def transaction_test(
args.append(2)
transaction.xrange(key11, IdBound("0-1"), IdBound("0-1"))
args.append({"0-1": [["foo", "bar"]]})
transaction.xrevrange(key11, IdBound("0-1"), IdBound("0-1"))
args.append({"0-1": [["foo", "bar"]]})
transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True))
args.append(1)
transaction.xdel(key11, ["0-2", "0-3"])
Expand Down
Loading