Skip to content

Commit

Permalink
Python: add FUNCTION DUMP and FUNCTION RESTORE commands (valkey-io#1769)
Browse files Browse the repository at this point in the history
* Initial commit for function dump and restore commands

* Split UT to standalone and cluster tests

* Fixed mypy errors

* Addressed review comments

* Addressed review comment

* Addressed review comments

* Updated examples
  • Loading branch information
yipin-chen authored and cyip10 committed Jul 16, 2024
1 parent 16ccf5a commit 755e63a
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
* Python: Added DUMP and Restore commands ([#1733](https://github.com/aws/glide-for-redis/pull/1733))
* Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751))
* Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768))

* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ExpiryType,
ExpiryTypeGetEx,
FlushMode,
FunctionRestorePolicy,
InfoSection,
InsertPosition,
UpdateOptions,
Expand Down Expand Up @@ -144,6 +145,7 @@
"ExpiryType",
"ExpiryTypeGetEx",
"FlushMode",
"FunctionRestorePolicy",
"GeoSearchByBox",
"GeoSearchByRadius",
"GeoSearchCount",
Expand Down
71 changes: 71 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from glide.async_commands.core import (
CoreCommands,
FlushMode,
FunctionRestorePolicy,
InfoSection,
_build_sort_args,
)
Expand Down Expand Up @@ -554,6 +555,76 @@ async def fcall_ro_route(
await self._execute_command(RequestType.FCallReadOnly, args, route),
)

async def function_dump(
self, route: Optional[Route] = None
) -> TClusterResponse[bytes]:
"""
Returns the serialized payload of all loaded libraries.
See https://valkey.io/commands/function-dump/ for more details.
Args:
route (Optional[Route]): The command will be routed to a random node, unless
`route` is provided, in which case the client will route the command to the
nodes defined by `route`.
Returns:
TClusterResponse[bytes]: The serialized payload of all loaded libraries.
Examples:
>>> payload = await client.function_dump()
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(payload)
"OK" # The serialized dump response was used to restore the libraries.
Since: Redis 7.0.0.
"""
return cast(
TClusterResponse[bytes],
await self._execute_command(RequestType.FunctionDump, [], route),
)

async def function_restore(
self,
payload: TEncodable,
policy: Optional[FunctionRestorePolicy] = None,
route: Optional[Route] = None,
) -> TOK:
"""
Restores libraries from the serialized payload returned by the `function_dump` command.
See https://valkey.io/commands/function-restore/ for more details.
Args:
payload (bytes): The serialized data from the `function_dump` command.
policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries.
route (Optional[Route]): The command will be sent to all primaries, unless
`route` is provided, in which case the client will route the command to the
nodes defined by `route`.
Returns:
TOK: OK.
Examples:
>>> payload = await client.function_dump()
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(payload, AllPrimaries())
"OK" # The serialized dump response was used to restore the libraries with the specified route.
>>> await client.function_restore(payload, FunctionRestorePolicy.FLUSH, AllPrimaries())
"OK" # The serialized dump response was used to restore the libraries with the specified route and policy.
Since: Redis 7.0.0.
"""
args: List[TEncodable] = [payload]
if policy is not None:
args.append(policy.value)

return cast(
TOK, await self._execute_command(RequestType.FunctionRestore, args, route)
)

async def time(
self, route: Optional[Route] = None
) -> TClusterResponse[List[bytes]]:
Expand Down
16 changes: 16 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,22 @@ class FlushMode(Enum):
SYNC = "SYNC"


class FunctionRestorePolicy(Enum):
"""
Options for the FUNCTION RESTORE command.
- APPEND: Appends the restored libraries to the existing libraries and aborts on collision. This is the
default policy.
- FLUSH: Deletes all existing libraries before restoring the payload.
- REPLACE: Appends the restored libraries to the existing libraries, replacing any existing ones in case
of name collisions. Note that this policy doesn't prevent function name collisions, only libraries.
"""

APPEND = "APPEND"
FLUSH = "FLUSH"
REPLACE = "REPLACE"


def _build_sort_args(
key: TEncodable,
by_pattern: Optional[TEncodable] = None,
Expand Down
53 changes: 53 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from glide.async_commands.core import (
CoreCommands,
FlushMode,
FunctionRestorePolicy,
InfoSection,
_build_sort_args,
)
Expand Down Expand Up @@ -361,6 +362,58 @@ async def function_delete(self, library_name: TEncodable) -> TOK:
),
)

async def function_dump(self) -> bytes:
"""
Returns the serialized payload of all loaded libraries.
See https://valkey.io/docs/latest/commands/function-dump/ for more details.
Returns:
bytes: The serialized payload of all loaded libraries.
Examples:
>>> payload = await client.function_dump()
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(payload)
"OK" # The serialized dump response was used to restore the libraries.
Since: Redis 7.0.0.
"""
return cast(bytes, await self._execute_command(RequestType.FunctionDump, []))

async def function_restore(
self, payload: TEncodable, policy: Optional[FunctionRestorePolicy] = None
) -> TOK:
"""
Restores libraries from the serialized payload returned by the `function_dump` command.
See https://valkey.io/docs/latest/commands/function-restore/ for more details.
Args:
payload (TEncodable): The serialized data from the `function_dump` command.
policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries.
Returns:
TOK: OK.
Examples:
>>> payload = await client.function_dump()
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(payload)
"OK" # The serialized dump response was used to restore the libraries.
>>> await client.function_restore(payload, FunctionRestorePolicy.FLUSH)
"OK" # The serialized dump response was used to restore the libraries with the specified policy.
Since: Redis 7.0.0.
"""
args: List[TEncodable] = [payload]
if policy is not None:
args.append(policy.value)

return cast(TOK, await self._execute_command(RequestType.FunctionRestore, args))

async def time(self) -> List[bytes]:
"""
Returns the server time.
Expand Down
166 changes: 166 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ExpiryType,
ExpiryTypeGetEx,
FlushMode,
FunctionRestorePolicy,
InfBound,
InfoSection,
InsertPosition,
Expand Down Expand Up @@ -8047,6 +8048,171 @@ async def test_fcall_readonly_function(self, redis_client: GlideClusterClient):
== 42
)

@pytest.mark.parametrize("cluster_mode", [False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_dump_restore_standalone(self, redis_client: GlideClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

assert await redis_client.function_flush(FlushMode.SYNC) is OK

# Dump an empty lib
emptyDump = await redis_client.function_dump()
assert emptyDump is not None and len(emptyDump) > 0

name1 = f"Foster{get_random_string(5)}"
name2 = f"Dogster{get_random_string(5)}"

# function name1 returns first argument; function name2 returns argument array len
code = generate_lua_lib_code(
name1, {name1: "return args[1]", name2: "return #args"}, False
)
assert await redis_client.function_load(code, True) == name1.encode()
flist = await redis_client.function_list(with_code=True)

dump = await redis_client.function_dump()
assert dump is not None

# restore without cleaning the lib and/or overwrite option causes an error
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(dump)
assert "already exists" in str(e)

# APPEND policy also fails for the same reason (name collision)
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(
dump, FunctionRestorePolicy.APPEND
)
assert "already exists" in str(e)

# REPLACE policy succeed
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
is OK
)

# but nothing changed - all code overwritten
assert await redis_client.function_list(with_code=True) == flist

# create lib with another name, but with the same function names
assert await redis_client.function_flush(FlushMode.SYNC) is OK
code = generate_lua_lib_code(
name2, {name1: "return args[1]", name2: "return #args"}, False
)
assert await redis_client.function_load(code, True) == name2.encode()

# REPLACE policy now fails due to a name collision
with pytest.raises(RequestError) as e:
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
assert "already exists" in str(e)

# FLUSH policy succeeds, but deletes the second lib
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK
)
assert await redis_client.function_list(with_code=True) == flist

# call restored functions
assert (
await redis_client.fcall(name1, arguments=["meow", "woem"])
== "meow".encode()
)
assert await redis_client.fcall(name2, arguments=["meow", "woem"]) == 2

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_dump_restore_cluster(
self, redis_client: GlideClusterClient
):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

assert await redis_client.function_flush(FlushMode.SYNC) is OK

# Dump an empty lib
emptyDump = await redis_client.function_dump()
assert emptyDump is not None and len(emptyDump) > 0

name1 = f"Foster{get_random_string(5)}"
libname1 = f"FosterLib{get_random_string(5)}"
name2 = f"Dogster{get_random_string(5)}"
libname2 = f"DogsterLib{get_random_string(5)}"

# function name1 returns first argument; function name2 returns argument array len
code = generate_lua_lib_code(
libname1, {name1: "return args[1]", name2: "return #args"}, True
)
assert await redis_client.function_load(code, True) == libname1.encode()
flist = await redis_client.function_list(with_code=True)
dump = await redis_client.function_dump(RandomNode())
assert dump is not None and isinstance(dump, bytes)

# restore without cleaning the lib and/or overwrite option causes an error
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(dump)
assert "already exists" in str(e)

# APPEND policy also fails for the same reason (name collision)
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(
dump, FunctionRestorePolicy.APPEND
)
assert "already exists" in str(e)

# REPLACE policy succeed
assert (
await redis_client.function_restore(
dump, FunctionRestorePolicy.REPLACE, route=AllPrimaries()
)
is OK
)

# but nothing changed - all code overwritten
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname1.encode()

# Note that function ordering may differ across nodes so we can't do a deep equals
assert len(restoredFunctionList[0]["functions".encode()]) == 2

# create lib with another name, but with the same function names
assert await redis_client.function_flush(FlushMode.SYNC) is OK
code = generate_lua_lib_code(
libname2, {name1: "return args[1]", name2: "return #args"}, True
)
assert await redis_client.function_load(code, True) == libname2.encode()
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname2.encode()

# REPLACE policy now fails due to a name collision
with pytest.raises(RequestError) as e:
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
assert "already exists" in str(e)

# FLUSH policy succeeds, but deletes the second lib
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK
)
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname1.encode()

# Note that function ordering may differ across nodes so we can't do a deep equals
assert len(restoredFunctionList[0]["functions".encode()]) == 2

# call restored functions
assert (
await redis_client.fcall_ro(name1, arguments=["meow", "woem"])
== "meow".encode()
)
assert await redis_client.fcall_ro(name2, arguments=["meow", "woem"]) == 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_srandmember(self, redis_client: TGlideClient):
Expand Down

0 comments on commit 755e63a

Please sign in to comment.