Skip to content

Commit

Permalink
Python: add XREADGROUP command (#1679)
Browse files Browse the repository at this point in the history
* Python: add XREADGROUP command

* Fix mypy error

* PR suggestions
  • Loading branch information
aaron-congo authored Jun 26, 2024
1 parent b969cd9 commit e83f9da
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))
* Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657))
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
TrimByMaxLen,
Expand Down Expand Up @@ -161,6 +162,7 @@
"MinId",
"StreamAddOptions",
"StreamGroupOptions",
"StreamReadGroupOptions",
"StreamRangeBound",
"StreamReadOptions",
"StreamTrimOptions",
Expand Down
52 changes: 52 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
)
Expand Down Expand Up @@ -2916,6 +2917,57 @@ async def xgroup_del_consumer(
),
)

async def xreadgroup(
self,
keys_and_ids: Mapping[str, str],
group_name: str,
consumer_name: str,
options: Optional[StreamReadGroupOptions] = None,
) -> Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]:
"""
Reads entries from the given streams owned by a consumer group.
See https://valkey.io/commands/xreadgroup for more details.
Note:
When in cluster mode, all keys in `keys_and_ids` must map to the same hash slot.
Args:
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
entries pending for the consumer with IDs greater than the one provided.
group_name (str): The consumer group name.
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.
Returns:
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0"))
>>> await client.xgroup_create("mystream", "mygroup", "0-0")
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer", StreamReadGroupOptions(count=1))
{
"mystream": {
"1-0": [["field1", "value1"]],
}
} # Read one stream entry from "mystream" using "myconsumer" in the consumer group "mygroup".
"""
args = ["GROUP", group_name, consumer_name]
if options is not None:
args.extend(options.to_args())

args.append("STREAMS")
args.extend([key for key in keys_and_ids.keys()])
args.extend([value for value in keys_and_ids.values()])

return cast(
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]],
await self._execute_command(RequestType.XReadGroup, args),
)

async def geoadd(
self,
key: str,
Expand Down
34 changes: 34 additions & 0 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,37 @@ def to_args(self) -> List[str]:
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])

return args


class StreamReadGroupOptions(StreamReadOptions):
READ_NOACK_REDIS_API = "NOACK"

def __init__(
self, no_ack=False, block_ms: Optional[int] = None, count: Optional[int] = None
):
"""
Options for reading entries from streams using a consumer group. Can be used as an optional argument to
`XREADGROUP`.
Args:
no_ack (bool): If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
acknowledging the message when it is read. Equivalent to `NOACK` in the Redis API.
block_ms (Optional[int]): If provided, the request will be blocked for the set amount of milliseconds or
until the server has the required number of entries. Equivalent to `BLOCK` in the Redis API.
count (Optional[int]): The maximum number of elements requested. Equivalent to `COUNT` in the Redis API.
"""
super().__init__(block_ms=block_ms, count=count)
self.no_ack = no_ack

def to_args(self) -> List[str]:
"""
Returns the options as a list of string arguments to be used in the `XREADGROUP` command.
Returns:
List[str]: The options as a list of arguments for the `XREADGROUP` command.
"""
args = super().to_args()
if self.no_ack:
args.append(self.READ_NOACK_REDIS_API)

return args
36 changes: 36 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
StreamAddOptions,
StreamGroupOptions,
StreamRangeBound,
StreamReadGroupOptions,
StreamReadOptions,
StreamTrimOptions,
)
Expand Down Expand Up @@ -2039,6 +2040,41 @@ def xgroup_del_consumer(
RequestType.XGroupDelConsumer, [key, group_name, consumer_name]
)

def xreadgroup(
self: TTransaction,
keys_and_ids: Mapping[str, str],
group_name: str,
consumer_name: str,
options: Optional[StreamReadGroupOptions] = None,
) -> TTransaction:
"""
Reads entries from the given streams owned by a consumer group.
See https://valkey.io/commands/xreadgroup for more details.
Args:
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
entries pending for the consumer with IDs greater than the one provided.
group_name (str): The consumer group name.
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.
Command response:
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
"""
args = ["GROUP", group_name, consumer_name]
if options is not None:
args.extend(options.to_args())

args.append("STREAMS")
args.extend([key for key in keys_and_ids.keys()])
args.extend([value for value in keys_and_ids.values()])

return self.append_command(RequestType.XReadGroup, args)

def geoadd(
self: TTransaction,
key: str,
Expand Down
Loading

0 comments on commit e83f9da

Please sign in to comment.