Skip to content

Commit

Permalink
Python: add WATCH and UNWATCH commands (#1736)
Browse files Browse the repository at this point in the history
* Python: add WATCH and UNWATCH commands

* isort

* update redis_client type for test_unwatch_with_route()

* add watch to the correct cluster test

* add cluster mode note

* address comments from GumpacG

* Python: added XAUTOCLAIM command (#1718)

* Python: add XAUTOCLAIM command

* minor doc update

* Minor doc update

* PR suggestions

* Update test assertions with string conversions

* PR suggestions

* Add clarifying comments

* Python: Add ZSCAN and HSCAN commands (#1732)

* Added zscan command

* Added HSCAN command

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Debug tests

* Fixed tests and addressed comments

* Update python/python/tests/test_async_client.py

Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>

---------

Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Python: add WATCH and UNWATCH commands

* Python: add WATCH and UNWATCH commands

* add changelog

* fix broken syntax after solving conflicts

---------

Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com>
Co-authored-by: Guian Gumpac <guian.gumpac@improving.com>
Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
4 people authored Jul 1, 2024
1 parent eb2201c commit 8e048b1
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718))
* Python: Add ZSCAN and HSCAN commands ([#1732](https://github.com/aws/glide-for-redis/pull/1732))
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))
* Python: Added WATCH and UNWATCH command ([#1736](https://github.com/aws/glide-for-redis/pull/1736))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
23 changes: 23 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,26 @@ async def wait(
int,
await self._execute_command(RequestType.Wait, args, route),
)

async def unwatch(self, route: Optional[Route] = None) -> TOK:
"""
Flushes all the previously watched keys for a transaction. Executing a transaction will
automatically flush all previously watched keys.
See https://valkey.io/commands/unwatch for more details.
Args:
route (Optional[Route]): The command will be routed to all primary nodes, unless `route` is provided,
in which case the client will route the command to the nodes defined by `route`.
Returns:
TOK: A simple "OK" response.
Examples:
>>> await client.unwatch()
'OK'
"""
return cast(
TOK,
await self._execute_command(RequestType.UnWatch, [], route),
)
38 changes: 38 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5743,6 +5743,44 @@ async def fcall_ro(
await self._execute_command(RequestType.FCallReadOnly, args),
)

async def watch(self, keys: List[str]) -> TOK:
"""
Marks the given keys to be watched for conditional execution of a transaction. Transactions
will only execute commands if the watched keys are not modified before execution of the
transaction.
See https://valkey.io/commands/watch for more details.
Note:
When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots.
Args:
keys (List[str]): The keys to watch.
Returns:
TOK: A simple "OK" response.
Examples:
>>> await client.watch("sampleKey")
'OK'
>>> transaction.set("sampleKey", "foobar")
>>> await redis_client.exec(transaction)
'OK' # Executes successfully and keys are unwatched.
>>> await client.watch("sampleKey")
'OK'
>>> transaction.set("sampleKey", "foobar")
>>> await client.set("sampleKey", "hello world")
'OK'
>>> await redis_client.exec(transaction)
None # None is returned when the watched key is modified before transaction execution.
"""

return cast(
TOK,
await self._execute_command(RequestType.Watch, keys),
)

@dataclass
class PubSubMsg:
"""
Expand Down
21 changes: 21 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,24 @@ async def wait(
int,
await self._execute_command(RequestType.Wait, args),
)

async def unwatch(self) -> TOK:
"""
Flushes all the previously watched keys for a transaction. Executing a transaction will
automatically flush all previously watched keys.
See https://valkey.io/commands/unwatch for more details.
Returns:
TOK: A simple "OK" response.
Examples:
>>> await client.watch("sampleKey")
'OK'
>>> await client.unwatch()
'OK'
"""
return cast(
TOK,
await self._execute_command(RequestType.UnWatch, []),
)
89 changes: 69 additions & 20 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@
TrimByMaxLen,
TrimByMinId,
)
from glide.async_commands.transaction import (
BaseTransaction,
ClusterTransaction,
Transaction,
)
from glide.async_commands.transaction import ClusterTransaction, Transaction
from glide.config import (
ClusterClientConfiguration,
GlideClientConfiguration,
Expand Down Expand Up @@ -7420,18 +7416,18 @@ async def test_flushall(self, redis_client: TGlideClient):

await redis_client.set(key, value)
assert await redis_client.dbsize() > 0
assert await redis_client.flushall() is OK
assert await redis_client.flushall(FlushMode.ASYNC) is OK
assert await redis_client.flushall() == OK
assert await redis_client.flushall(FlushMode.ASYNC) == OK
if not await check_if_server_version_lt(redis_client, min_version):
assert await redis_client.flushall(FlushMode.SYNC) is OK
assert await redis_client.flushall(FlushMode.SYNC) == OK
assert await redis_client.dbsize() == 0

if isinstance(redis_client, GlideClusterClient):
await redis_client.set(key, value)
assert await redis_client.flushall(route=AllPrimaries()) is OK
assert await redis_client.flushall(FlushMode.ASYNC, AllPrimaries()) is OK
assert await redis_client.flushall(route=AllPrimaries()) == OK
assert await redis_client.flushall(FlushMode.ASYNC, AllPrimaries()) == OK
if not await check_if_server_version_lt(redis_client, min_version):
assert await redis_client.flushall(FlushMode.SYNC, AllPrimaries()) is OK
assert await redis_client.flushall(FlushMode.SYNC, AllPrimaries()) == OK
assert await redis_client.dbsize() == 0

@pytest.mark.parametrize("cluster_mode", [False])
Expand All @@ -7443,30 +7439,30 @@ async def test_standalone_flushdb(self, redis_client: GlideClient):
value = get_random_string(5)

# fill DB 0 and check size non-empty
assert await redis_client.select(0) is OK
assert await redis_client.select(0) == OK
await redis_client.set(key1, value)
assert await redis_client.dbsize() > 0

# fill DB 1 and check size non-empty
assert await redis_client.select(1) is OK
assert await redis_client.select(1) == OK
await redis_client.set(key2, value)
assert await redis_client.dbsize() > 0

# flush DB 1 and check again
assert await redis_client.flushdb() is OK
assert await redis_client.flushdb() == OK
assert await redis_client.dbsize() == 0

# swith to DB 0, flush, and check
assert await redis_client.select(0) is OK
assert await redis_client.select(0) == OK
assert await redis_client.dbsize() > 0
assert await redis_client.flushdb(FlushMode.ASYNC) is OK
assert await redis_client.flushdb(FlushMode.ASYNC) == OK
assert await redis_client.dbsize() == 0

# verify flush SYNC
if not await check_if_server_version_lt(redis_client, min_version):
await redis_client.set(key2, value)
assert await redis_client.dbsize() > 0
assert await redis_client.flushdb(FlushMode.SYNC) is OK
assert await redis_client.flushdb(FlushMode.SYNC) == OK
assert await redis_client.dbsize() == 0

@pytest.mark.parametrize("cluster_mode", [True, False])
Expand Down Expand Up @@ -7851,6 +7847,58 @@ async def test_lcs_idx(self, redis_client: GlideClient):
with pytest.raises(RequestError):
await redis_client.lcs_idx(key1, lcs_non_string_key)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_watch(self, redis_client: GlideClient):
# watched key didn't change outside of transaction before transaction execution, transaction will execute
assert await redis_client.set("key1", "original_value") == OK
assert await redis_client.watch(["key1"]) == OK
transaction = Transaction()
transaction.set("key1", "transaction_value")
transaction.get("key1")
assert await redis_client.exec(transaction) is not None

# watched key changed outside of transaction before transaction execution, transaction will not execute
assert await redis_client.set("key1", "original_value") == OK
assert await redis_client.watch(["key1"]) == OK
transaction = Transaction()
transaction.set("key1", "transaction_value")
assert await redis_client.set("key1", "standalone_value") == OK
transaction.get("key1")
assert await redis_client.exec(transaction) is None

# empty list not supported
with pytest.raises(RequestError):
await redis_client.watch([])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_unwatch(self, redis_client: GlideClient):

# watched key unwatched before transaction execution even if changed
# outside of transaction, transaction will still execute
assert await redis_client.set("key1", "original_value") == OK
assert await redis_client.watch(["key1"]) == OK
transaction = Transaction()
transaction.set("key1", "transaction_value")
assert await redis_client.set("key1", "standalone_value") == OK
transaction.get("key1")
assert await redis_client.unwatch() == OK
result = await redis_client.exec(transaction)
assert result is not None
assert isinstance(result, list)
assert len(result) == 2
assert result[0] == "OK"
assert result[1] == b"transaction_value"

# UNWATCH returns OK when there no watched keys
assert await redis_client.unwatch() == OK

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_unwatch_with_route(self, redis_client: GlideClusterClient):
assert await redis_client.unwatch(RandomNode()) == OK


class TestMultiKeyCommandCrossSlot:
@pytest.mark.parametrize("cluster_mode", [True])
Expand Down Expand Up @@ -7942,6 +7990,7 @@ async def test_multi_key_command_routed_to_multiple_nodes(
await redis_client.mget(["abc", "zxy", "lkn"])
await redis_client.mset({"abc": "1", "zxy": "2", "lkn": "3"})
await redis_client.touch(["abc", "zxy", "lkn"])
await redis_client.watch(["abc", "zxy", "lkn"])


class TestCommandsUnitTests:
Expand Down Expand Up @@ -8216,18 +8265,18 @@ async def test_cluster_flushdb(self, redis_client: GlideClusterClient):

await redis_client.set(key, value)
assert await redis_client.dbsize() > 0
assert await redis_client.flushdb(route=AllPrimaries()) is OK
assert await redis_client.flushdb(route=AllPrimaries()) == OK
assert await redis_client.dbsize() == 0

await redis_client.set(key, value)
assert await redis_client.dbsize() > 0
assert await redis_client.flushdb(FlushMode.ASYNC, AllPrimaries()) is OK
assert await redis_client.flushdb(FlushMode.ASYNC, AllPrimaries()) == OK
assert await redis_client.dbsize() == 0

if not await check_if_server_version_lt(redis_client, min_version):
await redis_client.set(key, value)
assert await redis_client.dbsize() > 0
assert await redis_client.flushdb(FlushMode.SYNC, AllPrimaries()) is OK
assert await redis_client.flushdb(FlushMode.SYNC, AllPrimaries()) == OK
assert await redis_client.dbsize() == 0

@pytest.mark.parametrize("cluster_mode", [True, False])
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ async def test_can_return_null_on_watch_transaction_failures(
keyslot = get_random_string(3)
transaction = ClusterTransaction() if is_cluster else Transaction()
transaction.get(keyslot)
result1 = await redis_client.custom_command(["WATCH", keyslot])
result1 = await redis_client.watch([keyslot])
assert result1 == OK

result2 = await client2.set(keyslot, "foo")
Expand Down

0 comments on commit 8e048b1

Please sign in to comment.