Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XINFO STREAM command #1816

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
* Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751))
* Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768))
* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769))
* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794))
* Python: Added XINFO STREAM command ([#1816](https://github.com/aws/glide-for-redis/pull/1816))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down Expand Up @@ -208,7 +210,6 @@
* Node: Added LINDEX command ([#999](https://github.com/aws/glide-for-redis/pull/999))
* Python, Node: Added ZPOPMAX command ([#996](https://github.com/aws/glide-for-redis/pull/996), [#1009](https://github.com/aws/glide-for-redis/pull/1009))
* Python: Added DBSIZE command ([#1040](https://github.com/aws/glide-for-redis/pull/1040))
* Python: Added FUNCTION STATS command ([#1794](https://github.com/aws/glide-for-redis/pull/1794))

#### Features
* Python, Node: Added support in Lua Scripts ([#775](https://github.com/aws/glide-for-redis/pull/775), [#860](https://github.com/aws/glide-for-redis/pull/860))
Expand Down
410 changes: 410 additions & 0 deletions glide-core/src/client/value_conversion.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ enum RequestType {
XAutoClaim = 203;
XInfoGroups = 204;
XInfoConsumers = 205;
XInfoStream = 207;
Scan = 206;
Wait = 208;
XClaim = 209;
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub enum RequestType {
XAutoClaim = 203,
XInfoGroups = 204,
XInfoConsumers = 205,
XInfoStream = 207,
Scan = 206,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ordering is unacceptable!

Copy link
Collaborator Author

@aaron-congo aaron-congo Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol 206 was stolen from me, it was either group the XInfo commands together but have weird numerical ordering or group them apart but have consistently increasing number values. I didn't want to change the scan numerical value to 207 because that could mess with other people's local code if they forget to rebuild after pulling this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could've kept the numbers while swapping the ordering 😆 - welp, too late.

Wait = 208,
XClaim = 209,
Expand Down Expand Up @@ -434,6 +435,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim,
ProtobufRequestType::XInfoGroups => RequestType::XInfoGroups,
ProtobufRequestType::XInfoConsumers => RequestType::XInfoConsumers,
ProtobufRequestType::XInfoStream => RequestType::XInfoStream,
ProtobufRequestType::Wait => RequestType::Wait,
ProtobufRequestType::XClaim => RequestType::XClaim,
ProtobufRequestType::Scan => RequestType::Scan,
Expand Down Expand Up @@ -652,6 +654,7 @@ impl RequestType {
RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")),
RequestType::XInfoGroups => Some(get_two_word_command("XINFO", "GROUPS")),
RequestType::XInfoConsumers => Some(get_two_word_command("XINFO", "CONSUMERS")),
RequestType::XInfoStream => Some(get_two_word_command("XINFO", "STREAM")),
RequestType::Wait => Some(cmd("WAIT")),
RequestType::XClaim => Some(cmd("XCLAIM")),
RequestType::Scan => Some(cmd("SCAN")),
Expand Down
148 changes: 147 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@
StreamTrimOptions,
_create_xpending_range_args,
)
from glide.constants import TOK, TEncodable, TResult
from glide.constants import (
TOK,
TEncodable,
TResult,
TXInfoStreamFullResponse,
TXInfoStreamResponse,
)
from glide.protobuf.redis_request_pb2 import RequestType
from glide.routes import Route

Expand Down Expand Up @@ -3545,6 +3551,146 @@ async def xinfo_consumers(
await self._execute_command(RequestType.XInfoConsumers, [key, group_name]),
)

async def xinfo_stream(
self,
key: TEncodable,
) -> TXInfoStreamResponse:
"""
Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
Returns:
TXInfoStreamResponse: A mapping of stream information for the given `key`. See the example for a sample
response.
Examples:
>>> await client.xinfo_stream("my_stream")
{
b"length": 4,
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
b"radix-tree-keys": 1L,
b"radix-tree-nodes": 2L,
b"last-generated-id": b"1719877599564-0",
b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0.
b"entries-added": 4L, # This field was added in Redis version 7.0.0.
b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0.
b"groups": 1L,
b"first-entry": [
b"1719710679916-0",
[b"foo1", b"bar1", b"foo2", b"bar2"],
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
],
b"last-entry": [
b"1719877599564-0",
[b"field1", b"value1"],
],
}
# Stream information for "my_stream". Note that "first-entry" and "last-entry" could both be `None` if
# the stream is empty.
"""
return cast(
TXInfoStreamResponse,
await self._execute_command(RequestType.XInfoStream, [key]),
)

async def xinfo_stream_full(
self,
key: TEncodable,
count: Optional[int] = None,
) -> TXInfoStreamFullResponse:
"""
Returns verbose information about the stream stored at `key`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all
entries will be returned. If not provided, defaults to `10`.
Returns:
TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`. See the example for
a sample response.
Examples:
>>> await client.xinfo_stream_full("my_stream")
{
b"length": 4,
b"radix-tree-keys": 1L,
b"radix-tree-nodes": 2L,
b"last-generated-id": b"1719877599564-0",
b"max-deleted-entry-id": b"0-0", # This field was added in Redis version 7.0.0.
b"entries-added": 4L, # This field was added in Redis version 7.0.0.
b"recorded-first-entry-id": b"1719710679916-0", # This field was added in Redis version 7.0.0.
b"entries": [
[
b"1719710679916-0",
[b"foo1", b"bar1", b"foo2", b"bar2"],
],
[
b"1719877599564-0":
[b"field1", b"value1"],
]
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
],
b"groups": [
{
b"name": b"mygroup",
b"last-delivered-id": b"1719710688676-0",
b"entries-read": 2, # This field was added in Redis version 7.0.0.
b"lag": 0, # This field was added in Redis version 7.0.0.
b"pel-count": 2,
b"pending": [
[
b"1719710679916-0",
b"Alice",
1719710707260,
1,
],
[
b"1719710688676-0",
b"Alice",
1719710718373,
1,
],
],
b"consumers": [
{
b"name": b"Alice",
b"seen-time": 1719710718373,
b"active-time": 1719710718373, # This field was added in Redis version 7.2.0.
b"pel-count": 2,
b"pending": [
[
b"1719710679916-0",
1719710707260,
1
],
[
b"1719710688676-0",
1719710718373,
1
]
]
}
]
}
]
}
# Detailed stream information for "my_stream".
Since: Redis version 6.0.0.
"""
args = [key, "FULL"]
if count is not None:
args.extend(["COUNT", str(count)])

return cast(
TXInfoStreamFullResponse,
await self._execute_command(RequestType.XInfoStream, args),
)

async def geoadd(
self,
key: TEncodable,
Expand Down
41 changes: 41 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,47 @@ def xinfo_consumers(
"""
return self.append_command(RequestType.XInfoConsumers, [key, group_name])

def xinfo_stream(
self: TTransaction,
key: TEncodable,
) -> TTransaction:
"""
Returns information about the stream stored at `key`. To get more detailed information, use `xinfo_stream_full`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
Command response:
TXInfoStreamResponse: A mapping of stream information for the given `key`.
"""
return self.append_command(RequestType.XInfoStream, [key])

def xinfo_stream_full(
self: TTransaction,
key: TEncodable,
count: Optional[int] = None,
) -> TTransaction:
"""
Returns verbose information about the stream stored at `key`.
See https://valkey.io/commands/xinfo-stream for more details.
Args:
key (TEncodable): The key of the stream.
count (Optional[int]): The number of stream and PEL entries that are returned. A value of `0` means that all
entries will be returned. If not provided, defaults to `10`.
Command response:
TXInfoStreamFullResponse: A mapping of detailed stream information for the given `key`.
"""
args = [key, "FULL"]
if count is not None:
args.extend(["COUNT", str(count)])

return self.append_command(RequestType.XInfoStream, args)

def geoadd(
self: TTransaction,
key: TEncodable,
Expand Down
13 changes: 13 additions & 0 deletions python/python/glide/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@
],
],
]

TXInfoStreamResponse = Mapping[
bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]]
]
TXInfoStreamFullResponse = Mapping[
bytes,
Union[
bytes,
int,
Mapping[bytes, List[List[bytes]]],
List[Mapping[bytes, Union[bytes, int, List[List[Union[bytes, int]]]]]],
],
]
117 changes: 117 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6814,6 +6814,123 @@ async def test_xinfo_groups_xinfo_consumers_edge_cases_and_failures(
with pytest.raises(RequestError):
await glide_client.xinfo_consumers(string_key, group_name)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xinfo_stream(
self, glide_client: TGlideClient, cluster_mode, protocol
):
key = get_random_string(10)
group_name = get_random_string(10)
consumer = get_random_string(10)
stream_id0_0 = "0-0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"

# setup: add stream entry, create consumer group and consumer, read from stream with consumer
assert (
await glide_client.xadd(
key, [("a", "b"), ("c", "d")], StreamAddOptions(stream_id1_0)
)
== stream_id1_0.encode()
)
assert await glide_client.xgroup_create(key, group_name, stream_id0_0) == OK
assert await glide_client.xreadgroup({key: ">"}, group_name, consumer) == {
key.encode(): {stream_id1_0.encode(): [[b"a", b"b"], [b"c", b"d"]]}
}

result = await glide_client.xinfo_stream(key)
assert result.get(b"length") == 1
expected_first_entry = [stream_id1_0.encode(), [b"a", b"b", b"c", b"d"]]
assert result.get(b"first-entry") == expected_first_entry

# only one entry exists, so first and last entry should be the same
assert result.get(b"last-entry") == expected_first_entry

# call XINFO STREAM with a byte string arg
result2 = await glide_client.xinfo_stream(key.encode())
assert result2 == result

# add one more entry
assert (
await glide_client.xadd(
key, [("foo", "bar")], StreamAddOptions(stream_id1_1)
)
== stream_id1_1.encode()
)

result_full = await glide_client.xinfo_stream_full(key, count=1)
print(result_full)
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
assert result_full.get(b"length") == 2
entries = cast(list, result_full.get(b"entries"))
# only the first entry will be returned since we passed count=1
assert len(entries) == 1
assert entries[0] == expected_first_entry

groups = cast(list, result_full.get(b"groups"))
assert len(groups) == 1
group_info = groups[0]
assert group_info.get(b"name") == group_name.encode()
pending = group_info.get(b"pending")
assert len(pending) == 1
assert stream_id1_0.encode() in pending[0]

consumers = group_info.get(b"consumers")
assert len(consumers) == 1
consumer_info = consumers[0]
assert consumer_info.get(b"name") == consumer.encode()
consumer_pending = consumer_info.get(b"pending")
assert len(consumer_pending) == 1
assert stream_id1_0.encode() in consumer_pending[0]

# call XINFO STREAM FULL with byte arg
result_full2 = await glide_client.xinfo_stream_full(key.encode())
# 2 entries should be returned, since we didn't pass the COUNT arg this time
assert len(cast(list, result_full2.get(b"entries"))) == 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xinfo_stream_edge_cases_and_failures(
self, glide_client: TGlideClient, cluster_mode, protocol
):
key = get_random_string(10)
string_key = get_random_string(10)
non_existing_key = get_random_string(10)
stream_id1_0 = "1-0"

# setup: create empty stream
assert (
await glide_client.xadd(
key, [("field", "value")], StreamAddOptions(stream_id1_0)
)
== stream_id1_0.encode()
)
assert await glide_client.xdel(key, [stream_id1_0]) == 1

# XINFO STREAM called against empty stream
result = await glide_client.xinfo_stream(key)
assert result.get(b"length") == 0
assert result.get(b"first-entry") is None
assert result.get(b"last-entry") is None

# XINFO STREAM FULL called against empty stream. Negative count values are ignored.
result_full = await glide_client.xinfo_stream_full(key, count=-3)
assert result_full.get(b"length") == 0
assert result_full.get(b"entries") == []
assert result_full.get(b"groups") == []

# calling XINFO STREAM with a non-existing key raises an error
with pytest.raises(RequestError):
await glide_client.xinfo_stream(non_existing_key)
with pytest.raises(RequestError):
await glide_client.xinfo_stream_full(non_existing_key)

# key exists, but it is not a stream
assert await glide_client.set(string_key, "foo")
with pytest.raises(RequestError):
await glide_client.xinfo_stream(string_key)
with pytest.raises(RequestError):
await glide_client.xinfo_stream_full(string_key)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_set_id(
Expand Down
Loading
Loading