diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c934181ce..affe447d9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ * Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710)) * Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718)) * Python: Add ZSCAN and HSCAN commands ([#1732](https://github.com/aws/glide-for-redis/pull/1732)) +* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 29bc7ff137..74e41b064f 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -415,6 +415,41 @@ async def function_delete( ), ) + async def fcall_ro_route( + self, + function: str, + arguments: Optional[List[str]] = None, + route: Optional[Route] = None, + ) -> TClusterResponse[TResult]: + """ + Invokes a previously loaded read-only function. + + See https://valkey.io/commands/fcall_ro for more details. + + Args: + function (str): The function name. + arguments (List[str]): An `array` of `function` arguments. `arguments` should not + represent names of keys. + route (Optional[Route]): Specifies the routing configuration of the command. The client + will route the command to the nodes defined by `route`. + + Returns: + TClusterResponse[TResult]: The return value depends on the function that was executed. + + Examples: + >>> await client.fcall_ro_route("Deep_Thought", ALL_NODES) + 42 # The return value on the function that was executed + + Since: Redis version 7.0.0. + """ + args = [function, "0"] + if arguments is not None: + args.extend(arguments) + return cast( + TClusterResponse[TResult], + await self._execute_command(RequestType.FCallReadOnly, args, route), + ) + async def time(self, route: Optional[Route] = None) -> TClusterResponse[List[str]]: """ Returns the server time. diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 799e0175d8..d56b367534 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -5700,6 +5700,49 @@ async def hscan( await self._execute_command(RequestType.HScan, args), ) + async def fcall_ro( + self, + function: str, + keys: Optional[List[str]] = None, + arguments: Optional[List[str]] = None, + ) -> TResult: + """ + Invokes a previously loaded read-only function. + + See https://valkey.io/commands/fcall_ro for more details. + + When in cluster mode, all keys in `keys` must map to the same hash slot. + + Args: + function (str): The function name. + keys (List[str]): An `array` of keys accessed by the function. To ensure the correct + execution of functions, all names of keys that a function accesses must be + explicitly provided as `keys`. + arguments (List[str]): An `array` of `function` arguments. `arguments` should not + represent names of keys. + + Returns: + TResult: The return value depends on the function that was executed. + + Examples: + >>> await client.fcall_ro("Deep_Thought", ["key1"], ["Answer", "to", "the", + "Ultimate", "Question", "of", "Life,", "the", "Universe,", "and", "Everything"]) + 42 # The return value on the function that was executed + + Since: Redis version 7.0.0. + """ + args = [] + if keys is not None: + args.extend([function, str(len(keys))] + keys) + else: + args.extend([function, str(0)]) + if arguments is not None: + args.extend(arguments) + return cast( + TResult, + await self._execute_command(RequestType.FCallReadOnly, args), + ) + @dataclass class PubSubMsg: """ diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 631f2003fa..fdbd25dded 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -1841,6 +1841,39 @@ def function_delete(self: TTransaction, library_name: str) -> TTransaction: [library_name], ) + def fcall_ro( + self: TTransaction, + function: str, + keys: Optional[List[str]] = None, + arguments: Optional[List[str]] = None, + ) -> TTransaction: + """ + Invokes a previously loaded read-only function. + + See https://valkey.io/commands/fcall_ro for more details. + + Args: + function (str): The function name. + keys (List[str]): An `array` of keys accessed by the function. To ensure the correct + execution of functions, all names of keys that a function accesses must be + explicitly provided as `keys`. + arguments (List[str]): An `array` of `function` arguments. `arguments` should not + represent names of keys. + + Command Response: + TResult: The return value depends on the function that was executed. + + Since: Redis version 7.0.0. + """ + args = [] + if keys is not None: + args.extend([function, str(len(keys))] + keys) + else: + args.extend([function, str(0)]) + if arguments is not None: + args.extend(arguments) + return self.append_command(RequestType.FCallReadOnly, args) + def xadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index d253c72d12..b189d24689 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -68,6 +68,11 @@ TrimByMaxLen, TrimByMinId, ) +from glide.async_commands.transaction import ( + BaseTransaction, + ClusterTransaction, + Transaction, +) from glide.config import ( ClusterClientConfiguration, GlideClientConfiguration, @@ -7052,16 +7057,13 @@ async def test_function_load(self, redis_client: TGlideClient): assert await redis_client.function_load(code) == lib_name.encode() - # TODO: change when FCALL, FCALL_RO is implemented + # TODO: change when FCALL is implemented assert ( await redis_client.custom_command(["FCALL", func_name, "0", "one", "two"]) == b"one" ) assert ( - await redis_client.custom_command( - ["FCALL_RO", func_name, "0", "one", "two"] - ) - == b"one" + await redis_client.fcall_ro(func_name, arguments=["one", "two"]) == b"one" ) # TODO: add FUNCTION LIST once implemented @@ -7082,6 +7084,11 @@ async def test_function_load(self, redis_client: TGlideClient): assert await redis_client.function_load(new_code, True) == lib_name.encode() + # TODO: add when FCALL is implemented + assert await redis_client.fcall_ro(func2_name, arguments=["one", "two"]) == 2 + + assert await redis_client.function_flush(FlushMode.SYNC) is OK + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) @pytest.mark.parametrize("single_route", [True, False]) @@ -7101,7 +7108,7 @@ async def test_function_load_cluster_with_route( assert await redis_client.function_load(code, False, route) == lib_name.encode() - # TODO: change when FCALL, FCALL_RO is implemented. + # TODO: change when FCALL is implemented. assert ( await redis_client.custom_command( ["FCALL", func_name, "0", "one", "two"], @@ -7109,14 +7116,17 @@ async def test_function_load_cluster_with_route( ) == b"one" ) - assert ( - await redis_client.custom_command( - ["FCALL_RO", func_name, "0", "one", "two"], - SlotKeyRoute(SlotType.PRIMARY, "1"), - ) - == b"one" + result = await redis_client.fcall_ro_route( + func_name, arguments=["one", "two"], route=route ) + if single_route: + assert result == b"one" + else: + assert isinstance(result, dict) + for nodeResponse in result.values(): + assert nodeResponse == b"one" + # TODO: add FUNCTION LIST once implemented # re-load library without replace @@ -7137,6 +7147,20 @@ async def test_function_load_cluster_with_route( await redis_client.function_load(new_code, True, route) == lib_name.encode() ) + # TODO: add when FCALL is implemented. + result = await redis_client.fcall_ro_route( + func2_name, arguments=["one", "two"], route=route + ) + + if single_route: + assert result == 2 + else: + assert isinstance(result, dict) + for nodeResponse in result.values(): + assert nodeResponse == 2 + + assert await redis_client.function_flush(FlushMode.SYNC, route) is OK + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_function_flush(self, redis_client: TGlideClient): @@ -7253,6 +7277,91 @@ async def test_function_delete_with_routing( await redis_client.function_delete(lib_name) assert "Library not found" in str(e) + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_fcall_with_key(self, redis_client: GlideClusterClient): + min_version = "7.0.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + key1 = f"{{testKey}}:1-{get_random_string(10)}" + key2 = f"{{testKey}}:2-{get_random_string(10)}" + keys = [key1, key2] + route = SlotKeyRoute(SlotType.PRIMARY, key1) + lib_name = f"mylib1C{get_random_string(5)}" + func_name = f"myfunc1c{get_random_string(5)}" + code = generate_lua_lib_code(lib_name, {func_name: "return keys[1]"}, True) + + assert await redis_client.function_flush(FlushMode.SYNC, route) is OK + assert await redis_client.function_load(code, False, route) == lib_name.encode() + + # TODO: add when FCALL is implemented. + assert ( + await redis_client.fcall_ro(func_name, keys=keys, arguments=[]) + == key1.encode() + ) + + transaction = ClusterTransaction() + # TODO: add when FCALL is implemented. + transaction.fcall_ro(func_name, keys=keys, arguments=[]) + + # check response from a routed transaction request + result = await redis_client.exec(transaction, route) + assert result is not None + assert result[0] == key1.encode() + + # if no route given, GLIDE should detect it automatically + result = await redis_client.exec(transaction) + assert result is not None + assert result[0] == key1.encode() + + assert await redis_client.function_flush(FlushMode.SYNC, route) is OK + + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_fcall_readonly_function(self, redis_client: GlideClusterClient): + min_version = "7.0.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + lib_name = f"fcall_readonly_function{get_random_string(5)}" + # intentionally using a REPLICA route + replicaRoute = SlotKeyRoute(SlotType.REPLICA, lib_name) + primaryRoute = SlotKeyRoute(SlotType.PRIMARY, lib_name) + func_name = f"fcall_readonly_function{get_random_string(5)}" + + # function $funcName returns a magic number + code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, False) + + assert await redis_client.function_load(code, False) == lib_name.encode() + + # On a replica node should fail, because a function isn't guaranteed to be RO + # TODO: add when FCALL is implemented. + with pytest.raises(RequestError) as e: + assert await redis_client.fcall_ro_route( + func_name, arguments=[], route=replicaRoute + ) + assert "You can't write against a read only replica." in str(e) + + # fcall_ro also fails to run it even on primary - another error + with pytest.raises(RequestError) as e: + assert await redis_client.fcall_ro_route( + func_name, arguments=[], route=primaryRoute + ) + assert "Can not execute a script with write flag using *_ro command." in str(e) + + # create the same function, but with RO flag + code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, True) + assert await redis_client.function_load(code, True) == lib_name.encode() + + # fcall should succeed now + assert ( + await redis_client.fcall_ro_route( + func_name, arguments=[], route=replicaRoute + ) + == 42 + ) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_srandmember(self, redis_client: TGlideClient): @@ -7810,6 +7919,7 @@ async def test_multi_key_command_returns_cross_slot_error( redis_client.lcs("abc", "def"), redis_client.lcs_len("abc", "def"), redis_client.lcs_idx("abc", "def"), + redis_client.fcall_ro("func", ["abc", "zxy", "lkn"], []), ] ) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index c7cf93638a..5e27c74c2d 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -108,6 +108,10 @@ async def transaction_test( args.append(lib_name.encode()) transaction.function_load(code, True) args.append(lib_name.encode()) + transaction.fcall_ro(func_name, [], arguments=["one", "two"]) + args.append(b"one") + transaction.fcall_ro(func_name, [key], arguments=["one", "two"]) + args.append(b"one") transaction.function_delete(lib_name) args.append(OK) transaction.function_flush()