Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add XACK command #1681

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))
* Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657))
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))
* Python: Added XACK command ([#1681](https://github.com/aws/glide-for-redis/pull/1681))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
39 changes: 39 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2968,6 +2968,45 @@ async def xreadgroup(
await self._execute_command(RequestType.XReadGroup, args),
)

async def xack(
self,
key: str,
group_name: str,
ids: List[str],
) -> int:
"""
Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group.
This command should be called on pending messages so that such messages do not get processed again by the
consumer group.

See https://valkey.io/commands/xack for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group.

Returns:
int: The number of messages that were successfully acknowledged.

Examples:
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0"))
>>> await client.xgroup_create("mystream", "mygroup", "0-0")
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer")
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
{
"mystream": {
"1-0": [["field1", "value1"]],
}
} # Read one stream entry, the entry is now in the Pending Entries List for "mygroup".
>>> await client.xack("mystream", "mygroup", ["1-0"])
1 # 1 pending message was acknowledged and removed from the Pending Entries List for "mygroup".
"""

return cast(
int,
await self._execute_command(RequestType.XAck, [key, group_name] + ids),
)

async def geoadd(
self,
key: str,
Expand Down
23 changes: 23 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,29 @@ def xreadgroup(

return self.append_command(RequestType.XReadGroup, args)

def xack(
self: TTransaction,
key: str,
group_name: str,
ids: List[str],
) -> TTransaction:
"""
Removes one or multiple messages from the Pending Entries List (PEL) of a stream consumer group.
This command should be called on pending messages so that such messages do not get processed again by the
consumer group.

See https://valkey.io/commands/xack for more details.

Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
ids (List[str]): The stream entry IDs to acknowledge and consume for the given consumer group.

Command response:
int: The number of messages that were successfully acknowledged.
"""
return self.append_command(RequestType.XAck, [key, group_name] + ids)

def geoadd(
self: TTransaction,
key: str,
Expand Down
77 changes: 77 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5485,6 +5485,83 @@ async def endless_xreadgroup_call():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_xreadgroup_call(), timeout=3)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xack(
self, redis_client: TRedisClient, cluster_mode, protocol, request
):
key = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:{get_random_string(10)}"
string_key = f"{{testKey}}:{get_random_string(10)}"
group_name = get_random_string(10)
consumer_name = get_random_string(10)
stream_id0 = "0"
stream_id1_0 = "1-0"
stream_id1_1 = "1-1"
stream_id1_2 = "1-2"

# setup: add 2 entries to the stream, create consumer group, read to mark them as pending
assert (
await redis_client.xadd(key, [("f0", "v0")], StreamAddOptions(stream_id1_0))
== stream_id1_0
)
assert (
await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_1))
== stream_id1_1
)
assert await redis_client.xgroup_create(key, group_name, stream_id0) == OK
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {
stream_id1_0: [["f0", "v0"]],
stream_id1_1: [["f1", "v1"]],
}
}

# add one more entry
assert (
await redis_client.xadd(key, [("f2", "v2")], StreamAddOptions(stream_id1_2))
== stream_id1_2
)

# acknowledge the first 2 entries
assert (
await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 2
)
# attempting to acknowledge the first 2 entries again returns 0 since they were already acknowledged
assert (
await redis_client.xack(key, group_name, [stream_id1_0, stream_id1_1]) == 0
)
# read the last, unacknowledged entry
assert await redis_client.xreadgroup({key: ">"}, group_name, consumer_name) == {
key: {stream_id1_2: [["f2", "v2"]]}
}
# deleting the consumer returns 1 since the last entry still hasn't been acknowledged
assert (
await redis_client.xgroup_del_consumer(key, group_name, consumer_name) == 1
)

# attempting to acknowledge a non-existing key returns 0
assert (
await redis_client.xack(non_existing_key, group_name, [stream_id1_0]) == 0
)
# attempting to acknowledge a non-existing group returns 0
assert await redis_client.xack(key, "non_existing_group", [stream_id1_0]) == 0
# attempting to acknowledge a non-existing ID returns 0
assert await redis_client.xack(key, group_name, ["99-99"]) == 0

# invalid arg - ID list must not be empty
with pytest.raises(RequestError):
await redis_client.xack(key, group_name, [])

# invalid arg - invalid stream ID format
with pytest.raises(RequestError):
await redis_client.xack(key, group_name, ["invalid_ID_format"])

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xack(string_key, group_name, [stream_id1_0])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
4 changes: 3 additions & 1 deletion python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,10 @@ async def transaction_test(
{key11: ">"}, group_name1, consumer, StreamReadGroupOptions(count=5)
)
args.append({key11: {"0-2": [["foo", "bar"]]}})
transaction.xgroup_del_consumer(key11, group_name1, consumer)
transaction.xack(key11, group_name1, ["0-2"])
args.append(1)
transaction.xgroup_del_consumer(key11, group_name1, consumer)
args.append(0)
transaction.xgroup_destroy(key11, group_name1)
args.append(True)
transaction.xgroup_destroy(key11, group_name2)
Expand Down
Loading