Skip to content

Commit

Permalink
Python: add FCALL_RO command (#1721)
Browse files Browse the repository at this point in the history
* Python: add FCALL_RO command

* Updated CHANGELOG.md

* Renamed cluster fcall_ro to fcall_ro_route to address review comments

* Modified tests with correct encoding type

* Addressed review comment

* Fixed encode issue in transacation test

* Changed return type to Optional

* Reverted changes of Optional return
  • Loading branch information
yipin-chen authored Jul 1, 2024
1 parent 397c641 commit d5bafac
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710))
* Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718))
* Python: Add ZSCAN and HSCAN commands ([#1732](https://github.com/aws/glide-for-redis/pull/1732))
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
35 changes: 35 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,41 @@ async def function_delete(
),
)

async def fcall_ro_route(
self,
function: str,
arguments: Optional[List[str]] = None,
route: Optional[Route] = None,
) -> TClusterResponse[TResult]:
"""
Invokes a previously loaded read-only function.
See https://valkey.io/commands/fcall_ro for more details.
Args:
function (str): The function name.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.
route (Optional[Route]): Specifies the routing configuration of the command. The client
will route the command to the nodes defined by `route`.
Returns:
TClusterResponse[TResult]: The return value depends on the function that was executed.
Examples:
>>> await client.fcall_ro_route("Deep_Thought", ALL_NODES)
42 # The return value on the function that was executed
Since: Redis version 7.0.0.
"""
args = [function, "0"]
if arguments is not None:
args.extend(arguments)
return cast(
TClusterResponse[TResult],
await self._execute_command(RequestType.FCallReadOnly, args, route),
)

async def time(self, route: Optional[Route] = None) -> TClusterResponse[List[str]]:
"""
Returns the server time.
Expand Down
43 changes: 43 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5700,6 +5700,49 @@ async def hscan(
await self._execute_command(RequestType.HScan, args),
)

async def fcall_ro(
self,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TResult:
"""
Invokes a previously loaded read-only function.
See https://valkey.io/commands/fcall_ro for more details.
When in cluster mode, all keys in `keys` must map to the same hash slot.
Args:
function (str): The function name.
keys (List[str]): An `array` of keys accessed by the function. To ensure the correct
execution of functions, all names of keys that a function accesses must be
explicitly provided as `keys`.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.
Returns:
TResult: The return value depends on the function that was executed.
Examples:
>>> await client.fcall_ro("Deep_Thought", ["key1"], ["Answer", "to", "the",
"Ultimate", "Question", "of", "Life,", "the", "Universe,", "and", "Everything"])
42 # The return value on the function that was executed
Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return cast(
TResult,
await self._execute_command(RequestType.FCallReadOnly, args),
)

@dataclass
class PubSubMsg:
"""
Expand Down
33 changes: 33 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,39 @@ def function_delete(self: TTransaction, library_name: str) -> TTransaction:
[library_name],
)

def fcall_ro(
self: TTransaction,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TTransaction:
"""
Invokes a previously loaded read-only function.
See https://valkey.io/commands/fcall_ro for more details.
Args:
function (str): The function name.
keys (List[str]): An `array` of keys accessed by the function. To ensure the correct
execution of functions, all names of keys that a function accesses must be
explicitly provided as `keys`.
arguments (List[str]): An `array` of `function` arguments. `arguments` should not
represent names of keys.
Command Response:
TResult: The return value depends on the function that was executed.
Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return self.append_command(RequestType.FCallReadOnly, args)

def xadd(
self: TTransaction,
key: str,
Expand Down
134 changes: 122 additions & 12 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
TrimByMaxLen,
TrimByMinId,
)
from glide.async_commands.transaction import (
BaseTransaction,
ClusterTransaction,
Transaction,
)
from glide.config import (
ClusterClientConfiguration,
GlideClientConfiguration,
Expand Down Expand Up @@ -7052,16 +7057,13 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(code) == lib_name.encode()

# TODO: change when FCALL, FCALL_RO is implemented
# TODO: change when FCALL is implemented
assert (
await redis_client.custom_command(["FCALL", func_name, "0", "one", "two"])
== b"one"
)
assert (
await redis_client.custom_command(
["FCALL_RO", func_name, "0", "one", "two"]
)
== b"one"
await redis_client.fcall_ro(func_name, arguments=["one", "two"]) == b"one"
)

# TODO: add FUNCTION LIST once implemented
Expand All @@ -7082,6 +7084,11 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(new_code, True) == lib_name.encode()

# TODO: add when FCALL is implemented
assert await redis_client.fcall_ro(func2_name, arguments=["one", "two"]) == 2

assert await redis_client.function_flush(FlushMode.SYNC) is OK

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
@pytest.mark.parametrize("single_route", [True, False])
Expand All @@ -7101,22 +7108,25 @@ async def test_function_load_cluster_with_route(

assert await redis_client.function_load(code, False, route) == lib_name.encode()

# TODO: change when FCALL, FCALL_RO is implemented.
# TODO: change when FCALL is implemented.
assert (
await redis_client.custom_command(
["FCALL", func_name, "0", "one", "two"],
SlotKeyRoute(SlotType.PRIMARY, "1"),
)
== b"one"
)
assert (
await redis_client.custom_command(
["FCALL_RO", func_name, "0", "one", "two"],
SlotKeyRoute(SlotType.PRIMARY, "1"),
)
== b"one"
result = await redis_client.fcall_ro_route(
func_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == b"one"
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == b"one"

# TODO: add FUNCTION LIST once implemented

# re-load library without replace
Expand All @@ -7137,6 +7147,20 @@ async def test_function_load_cluster_with_route(
await redis_client.function_load(new_code, True, route) == lib_name.encode()
)

# TODO: add when FCALL is implemented.
result = await redis_client.fcall_ro_route(
func2_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == 2
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == 2

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_flush(self, redis_client: TGlideClient):
Expand Down Expand Up @@ -7253,6 +7277,91 @@ async def test_function_delete_with_routing(
await redis_client.function_delete(lib_name)
assert "Library not found" in str(e)

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_fcall_with_key(self, redis_client: GlideClusterClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

key1 = f"{{testKey}}:1-{get_random_string(10)}"
key2 = f"{{testKey}}:2-{get_random_string(10)}"
keys = [key1, key2]
route = SlotKeyRoute(SlotType.PRIMARY, key1)
lib_name = f"mylib1C{get_random_string(5)}"
func_name = f"myfunc1c{get_random_string(5)}"
code = generate_lua_lib_code(lib_name, {func_name: "return keys[1]"}, True)

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK
assert await redis_client.function_load(code, False, route) == lib_name.encode()

# TODO: add when FCALL is implemented.
assert (
await redis_client.fcall_ro(func_name, keys=keys, arguments=[])
== key1.encode()
)

transaction = ClusterTransaction()
# TODO: add when FCALL is implemented.
transaction.fcall_ro(func_name, keys=keys, arguments=[])

# check response from a routed transaction request
result = await redis_client.exec(transaction, route)
assert result is not None
assert result[0] == key1.encode()

# if no route given, GLIDE should detect it automatically
result = await redis_client.exec(transaction)
assert result is not None
assert result[0] == key1.encode()

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_fcall_readonly_function(self, redis_client: GlideClusterClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

lib_name = f"fcall_readonly_function{get_random_string(5)}"
# intentionally using a REPLICA route
replicaRoute = SlotKeyRoute(SlotType.REPLICA, lib_name)
primaryRoute = SlotKeyRoute(SlotType.PRIMARY, lib_name)
func_name = f"fcall_readonly_function{get_random_string(5)}"

# function $funcName returns a magic number
code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, False)

assert await redis_client.function_load(code, False) == lib_name.encode()

# On a replica node should fail, because a function isn't guaranteed to be RO
# TODO: add when FCALL is implemented.
with pytest.raises(RequestError) as e:
assert await redis_client.fcall_ro_route(
func_name, arguments=[], route=replicaRoute
)
assert "You can't write against a read only replica." in str(e)

# fcall_ro also fails to run it even on primary - another error
with pytest.raises(RequestError) as e:
assert await redis_client.fcall_ro_route(
func_name, arguments=[], route=primaryRoute
)
assert "Can not execute a script with write flag using *_ro command." in str(e)

# create the same function, but with RO flag
code = generate_lua_lib_code(lib_name, {func_name: "return 42"}, True)
assert await redis_client.function_load(code, True) == lib_name.encode()

# fcall should succeed now
assert (
await redis_client.fcall_ro_route(
func_name, arguments=[], route=replicaRoute
)
== 42
)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_srandmember(self, redis_client: TGlideClient):
Expand Down Expand Up @@ -7810,6 +7919,7 @@ async def test_multi_key_command_returns_cross_slot_error(
redis_client.lcs("abc", "def"),
redis_client.lcs_len("abc", "def"),
redis_client.lcs_idx("abc", "def"),
redis_client.fcall_ro("func", ["abc", "zxy", "lkn"], []),
]
)

Expand Down
4 changes: 4 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ async def transaction_test(
args.append(lib_name.encode())
transaction.function_load(code, True)
args.append(lib_name.encode())
transaction.fcall_ro(func_name, [], arguments=["one", "two"])
args.append(b"one")
transaction.fcall_ro(func_name, [key], arguments=["one", "two"])
args.append(b"one")
transaction.function_delete(lib_name)
args.append(OK)
transaction.function_flush()
Expand Down

0 comments on commit d5bafac

Please sign in to comment.