From 8e048b108267361feadcb8a6df5c98375406c785 Mon Sep 17 00:00:00 2001 From: James Xin <126831592+jamesx-improving@users.noreply.github.com> Date: Sun, 30 Jun 2024 20:58:01 -0700 Subject: [PATCH] Python: add WATCH and UNWATCH commands (#1736) * Python: add WATCH and UNWATCH commands * isort * update redis_client type for test_unwatch_with_route() * add watch to the correct cluster test * add cluster mode note * address comments from GumpacG * Python: added XAUTOCLAIM command (#1718) * Python: add XAUTOCLAIM command * minor doc update * Minor doc update * PR suggestions * Update test assertions with string conversions * PR suggestions * Add clarifying comments * Python: Add ZSCAN and HSCAN commands (#1732) * Added zscan command * Added HSCAN command * Fix tests * Fix tests * Fix tests * Fix tests * Fix tests * Fix tests * Debug tests * Debug tests * Debug tests * Debug tests * Debug tests * Debug tests * Debug tests * Debug tests * Debug tests * Fixed tests and addressed comments * Update python/python/tests/test_async_client.py Co-authored-by: Yury-Fridlyand --------- Co-authored-by: Yury-Fridlyand * Python: add WATCH and UNWATCH commands * Python: add WATCH and UNWATCH commands * add changelog * fix broken syntax after solving conflicts --------- Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com> Co-authored-by: Guian Gumpac Co-authored-by: Yury-Fridlyand --- CHANGELOG.md | 1 + .../glide/async_commands/cluster_commands.py | 23 +++++ python/python/glide/async_commands/core.py | 38 ++++++++ .../async_commands/standalone_commands.py | 21 +++++ python/python/tests/test_async_client.py | 89 ++++++++++++++----- python/python/tests/test_transaction.py | 2 +- 6 files changed, 153 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index affe447d9b..3b90d258d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718)) * Python: Add ZSCAN and HSCAN commands ([#1732](https://github.com/aws/glide-for-redis/pull/1732)) * Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721)) +* Python: Added WATCH and UNWATCH command ([#1736](https://github.com/aws/glide-for-redis/pull/1736)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 74e41b064f..467a93cc83 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -809,3 +809,26 @@ async def wait( int, await self._execute_command(RequestType.Wait, args, route), ) + + async def unwatch(self, route: Optional[Route] = None) -> TOK: + """ + Flushes all the previously watched keys for a transaction. Executing a transaction will + automatically flush all previously watched keys. + + See https://valkey.io/commands/unwatch for more details. + + Args: + route (Optional[Route]): The command will be routed to all primary nodes, unless `route` is provided, + in which case the client will route the command to the nodes defined by `route`. + + Returns: + TOK: A simple "OK" response. + + Examples: + >>> await client.unwatch() + 'OK' + """ + return cast( + TOK, + await self._execute_command(RequestType.UnWatch, [], route), + ) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index d56b367534..4b0ded7d46 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -5743,6 +5743,44 @@ async def fcall_ro( await self._execute_command(RequestType.FCallReadOnly, args), ) + async def watch(self, keys: List[str]) -> TOK: + """ + Marks the given keys to be watched for conditional execution of a transaction. Transactions + will only execute commands if the watched keys are not modified before execution of the + transaction. + + See https://valkey.io/commands/watch for more details. + + Note: + When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots. + + Args: + keys (List[str]): The keys to watch. + + Returns: + TOK: A simple "OK" response. + + Examples: + >>> await client.watch("sampleKey") + 'OK' + >>> transaction.set("sampleKey", "foobar") + >>> await redis_client.exec(transaction) + 'OK' # Executes successfully and keys are unwatched. + + >>> await client.watch("sampleKey") + 'OK' + >>> transaction.set("sampleKey", "foobar") + >>> await client.set("sampleKey", "hello world") + 'OK' + >>> await redis_client.exec(transaction) + None # None is returned when the watched key is modified before transaction execution. + """ + + return cast( + TOK, + await self._execute_command(RequestType.Watch, keys), + ) + @dataclass class PubSubMsg: """ diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 859f4ae0fc..00d969862a 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -695,3 +695,24 @@ async def wait( int, await self._execute_command(RequestType.Wait, args), ) + + async def unwatch(self) -> TOK: + """ + Flushes all the previously watched keys for a transaction. Executing a transaction will + automatically flush all previously watched keys. + + See https://valkey.io/commands/unwatch for more details. + + Returns: + TOK: A simple "OK" response. + + Examples: + >>> await client.watch("sampleKey") + 'OK' + >>> await client.unwatch() + 'OK' + """ + return cast( + TOK, + await self._execute_command(RequestType.UnWatch, []), + ) diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index b189d24689..f66263beff 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -68,11 +68,7 @@ TrimByMaxLen, TrimByMinId, ) -from glide.async_commands.transaction import ( - BaseTransaction, - ClusterTransaction, - Transaction, -) +from glide.async_commands.transaction import ClusterTransaction, Transaction from glide.config import ( ClusterClientConfiguration, GlideClientConfiguration, @@ -7420,18 +7416,18 @@ async def test_flushall(self, redis_client: TGlideClient): await redis_client.set(key, value) assert await redis_client.dbsize() > 0 - assert await redis_client.flushall() is OK - assert await redis_client.flushall(FlushMode.ASYNC) is OK + assert await redis_client.flushall() == OK + assert await redis_client.flushall(FlushMode.ASYNC) == OK if not await check_if_server_version_lt(redis_client, min_version): - assert await redis_client.flushall(FlushMode.SYNC) is OK + assert await redis_client.flushall(FlushMode.SYNC) == OK assert await redis_client.dbsize() == 0 if isinstance(redis_client, GlideClusterClient): await redis_client.set(key, value) - assert await redis_client.flushall(route=AllPrimaries()) is OK - assert await redis_client.flushall(FlushMode.ASYNC, AllPrimaries()) is OK + assert await redis_client.flushall(route=AllPrimaries()) == OK + assert await redis_client.flushall(FlushMode.ASYNC, AllPrimaries()) == OK if not await check_if_server_version_lt(redis_client, min_version): - assert await redis_client.flushall(FlushMode.SYNC, AllPrimaries()) is OK + assert await redis_client.flushall(FlushMode.SYNC, AllPrimaries()) == OK assert await redis_client.dbsize() == 0 @pytest.mark.parametrize("cluster_mode", [False]) @@ -7443,30 +7439,30 @@ async def test_standalone_flushdb(self, redis_client: GlideClient): value = get_random_string(5) # fill DB 0 and check size non-empty - assert await redis_client.select(0) is OK + assert await redis_client.select(0) == OK await redis_client.set(key1, value) assert await redis_client.dbsize() > 0 # fill DB 1 and check size non-empty - assert await redis_client.select(1) is OK + assert await redis_client.select(1) == OK await redis_client.set(key2, value) assert await redis_client.dbsize() > 0 # flush DB 1 and check again - assert await redis_client.flushdb() is OK + assert await redis_client.flushdb() == OK assert await redis_client.dbsize() == 0 # swith to DB 0, flush, and check - assert await redis_client.select(0) is OK + assert await redis_client.select(0) == OK assert await redis_client.dbsize() > 0 - assert await redis_client.flushdb(FlushMode.ASYNC) is OK + assert await redis_client.flushdb(FlushMode.ASYNC) == OK assert await redis_client.dbsize() == 0 # verify flush SYNC if not await check_if_server_version_lt(redis_client, min_version): await redis_client.set(key2, value) assert await redis_client.dbsize() > 0 - assert await redis_client.flushdb(FlushMode.SYNC) is OK + assert await redis_client.flushdb(FlushMode.SYNC) == OK assert await redis_client.dbsize() == 0 @pytest.mark.parametrize("cluster_mode", [True, False]) @@ -7851,6 +7847,58 @@ async def test_lcs_idx(self, redis_client: GlideClient): with pytest.raises(RequestError): await redis_client.lcs_idx(key1, lcs_non_string_key) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_watch(self, redis_client: GlideClient): + # watched key didn't change outside of transaction before transaction execution, transaction will execute + assert await redis_client.set("key1", "original_value") == OK + assert await redis_client.watch(["key1"]) == OK + transaction = Transaction() + transaction.set("key1", "transaction_value") + transaction.get("key1") + assert await redis_client.exec(transaction) is not None + + # watched key changed outside of transaction before transaction execution, transaction will not execute + assert await redis_client.set("key1", "original_value") == OK + assert await redis_client.watch(["key1"]) == OK + transaction = Transaction() + transaction.set("key1", "transaction_value") + assert await redis_client.set("key1", "standalone_value") == OK + transaction.get("key1") + assert await redis_client.exec(transaction) is None + + # empty list not supported + with pytest.raises(RequestError): + await redis_client.watch([]) + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_unwatch(self, redis_client: GlideClient): + + # watched key unwatched before transaction execution even if changed + # outside of transaction, transaction will still execute + assert await redis_client.set("key1", "original_value") == OK + assert await redis_client.watch(["key1"]) == OK + transaction = Transaction() + transaction.set("key1", "transaction_value") + assert await redis_client.set("key1", "standalone_value") == OK + transaction.get("key1") + assert await redis_client.unwatch() == OK + result = await redis_client.exec(transaction) + assert result is not None + assert isinstance(result, list) + assert len(result) == 2 + assert result[0] == "OK" + assert result[1] == b"transaction_value" + + # UNWATCH returns OK when there no watched keys + assert await redis_client.unwatch() == OK + + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_unwatch_with_route(self, redis_client: GlideClusterClient): + assert await redis_client.unwatch(RandomNode()) == OK + class TestMultiKeyCommandCrossSlot: @pytest.mark.parametrize("cluster_mode", [True]) @@ -7942,6 +7990,7 @@ async def test_multi_key_command_routed_to_multiple_nodes( await redis_client.mget(["abc", "zxy", "lkn"]) await redis_client.mset({"abc": "1", "zxy": "2", "lkn": "3"}) await redis_client.touch(["abc", "zxy", "lkn"]) + await redis_client.watch(["abc", "zxy", "lkn"]) class TestCommandsUnitTests: @@ -8216,18 +8265,18 @@ async def test_cluster_flushdb(self, redis_client: GlideClusterClient): await redis_client.set(key, value) assert await redis_client.dbsize() > 0 - assert await redis_client.flushdb(route=AllPrimaries()) is OK + assert await redis_client.flushdb(route=AllPrimaries()) == OK assert await redis_client.dbsize() == 0 await redis_client.set(key, value) assert await redis_client.dbsize() > 0 - assert await redis_client.flushdb(FlushMode.ASYNC, AllPrimaries()) is OK + assert await redis_client.flushdb(FlushMode.ASYNC, AllPrimaries()) == OK assert await redis_client.dbsize() == 0 if not await check_if_server_version_lt(redis_client, min_version): await redis_client.set(key, value) assert await redis_client.dbsize() > 0 - assert await redis_client.flushdb(FlushMode.SYNC, AllPrimaries()) is OK + assert await redis_client.flushdb(FlushMode.SYNC, AllPrimaries()) == OK assert await redis_client.dbsize() == 0 @pytest.mark.parametrize("cluster_mode", [True, False]) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 5e27c74c2d..635115fc22 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -783,7 +783,7 @@ async def test_can_return_null_on_watch_transaction_failures( keyslot = get_random_string(3) transaction = ClusterTransaction() if is_cluster else Transaction() transaction.get(keyslot) - result1 = await redis_client.custom_command(["WATCH", keyslot]) + result1 = await redis_client.watch([keyslot]) assert result1 == OK result2 = await client2.set(keyslot, "foo")