Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: add FUNCTION DUMP and FUNCTION RESTORE commands #1769

Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
* Python: Added DUMP and Restore commands ([#1733](https://github.com/aws/glide-for-redis/pull/1733))
* Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751))
* Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768))

* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ExpiryType,
ExpiryTypeGetEx,
FlushMode,
FunctionRestorePolicy,
InfoSection,
InsertPosition,
UpdateOptions,
Expand Down Expand Up @@ -144,6 +145,7 @@
"ExpiryType",
"ExpiryTypeGetEx",
"FlushMode",
"FunctionRestorePolicy",
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"GeoSearchByBox",
"GeoSearchByRadius",
"GeoSearchCount",
Expand Down
69 changes: 69 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from glide.async_commands.core import (
CoreCommands,
FlushMode,
FunctionRestorePolicy,
InfoSection,
_build_sort_args,
)
Expand Down Expand Up @@ -554,6 +555,74 @@ async def fcall_ro_route(
await self._execute_command(RequestType.FCallReadOnly, args, route),
)

async def function_dump(
self, route: Optional[Route] = None
) -> TClusterResponse[bytes]:
"""
Returns the serialized payload of all loaded libraries.

See https://valkey.io/docs/latest/commands/function-dump/ for more details.

Args:
route (Optional[Route]): The command will be routed to a random node, unless
`route` is provided, in which case the client will route the command to the
nodes defined by `route`.

Returns:
TClusterResponse[bytes]: The serialized payload of all loaded libraries.

Examples:
>>> await client.function_dump()
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
<binary data>
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(<binary data>)
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"OK" # The serialized dump response was used to restore the libraries.

Since: Redis 7.0.0.
"""
return cast(
TClusterResponse[bytes],
await self._execute_command(RequestType.FunctionDump, [], route),
)

async def function_restore(
self,
payload: TEncodable,
policy: Optional[FunctionRestorePolicy] = None,
route: Optional[Route] = None,
) -> TOK:
"""
Restores libraries from the serialized payload returned by the `function_dump` command.

See https://valkey.io/docs/latest/commands/function-restore/ for more details.

Args:
payload (bytes): The serialized data from the `function_dump` command.
policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries.
route (Optional[Route]): The command will be sent to all primaries, unless
`route` is provided, in which case the client will route the command to the
nodes defined by `route`.

Returns:
TOK: OK.

Examples:
>>> await client.function_restore(data, AllPrimaries())
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"OK"
>>> await client.function_restore(data, FunctionRestorePolicy.FLUSH, AllPrimaries())
"OK"

Since: Redis 7.0.0.
"""
args: List[TEncodable] = [payload]
if policy is not None:
args.append(policy.value)

return cast(
TOK, await self._execute_command(RequestType.FunctionRestore, args, route)
)

async def time(
self, route: Optional[Route] = None
) -> TClusterResponse[List[bytes]]:
Expand Down
16 changes: 16 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,22 @@ class FlushMode(Enum):
SYNC = "SYNC"


class FunctionRestorePolicy(Enum):
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"""
Options for the FUNCTION RESTORE command.
- APPEND: Appends the restored libraries to the existing libraries and aborts on collision. This is the
default policy.
- FLUSH: Deletes all existing libraries before restoring the payload.
- REPLACE: Appends the restored libraries to the existing libraries, replacing any existing ones in case
of name collisions. Note that this policy doesn't prevent function name collisions, only libraries.
"""

APPEND = "APPEND"
FLUSH = "FLUSH"
REPLACE = "REPLACE"


def _build_sort_args(
key: TEncodable,
by_pattern: Optional[TEncodable] = None,
Expand Down
51 changes: 51 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from glide.async_commands.core import (
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
CoreCommands,
FlushMode,
FunctionRestorePolicy,
InfoSection,
_build_sort_args,
)
Expand Down Expand Up @@ -361,6 +362,56 @@ async def function_delete(self, library_name: TEncodable) -> TOK:
),
)

async def function_dump(self) -> bytes:
"""
Returns the serialized payload of all loaded libraries.

See https://valkey.io/docs/latest/commands/function-dump/ for more details.

Returns:
bytes: The serialized payload of all loaded libraries.

Examples:
>>> await client.function_dump()
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
<binary data>
# The serialized payload of all loaded libraries. This response can
# be used to restore loaded functions on any Valkey instance.
>>> await client.function_restore(<binary data>)
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"OK" # The serialized dump response was used to restore the libraries.

Since: Redis 7.0.0.
"""
return cast(bytes, await self._execute_command(RequestType.FunctionDump, []))

async def function_restore(
self, payload: TEncodable, policy: Optional[FunctionRestorePolicy] = None
) -> TOK:
"""
Restores libraries from the serialized payload returned by the `function_dump` command.

See https://valkey.io/docs/latest/commands/function-restore/ for more details.

Args:
payload (TEncodable): The serialized data from the `function_dump` command.
policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries.

Returns:
TOK: OK.

Examples:
>>> await client.function_restore(data)
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
"OK"
>>> await client.function_restore(data, FunctionRestorePolicy.FLUSH)
"OK"

Since: Redis 7.0.0.
"""
args: List[TEncodable] = [payload]
if policy is not None:
args.append(policy.value)

return cast(TOK, await self._execute_command(RequestType.FunctionRestore, args))

async def time(self) -> List[bytes]:
"""
Returns the server time.
Expand Down
166 changes: 166 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ExpiryType,
ExpiryTypeGetEx,
FlushMode,
FunctionRestorePolicy,
InfBound,
InfoSection,
InsertPosition,
Expand Down Expand Up @@ -8047,6 +8048,171 @@ async def test_fcall_readonly_function(self, redis_client: GlideClusterClient):
== 42
)

@pytest.mark.parametrize("cluster_mode", [False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_dump_restore_standalone(self, redis_client: GlideClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

assert await redis_client.function_flush(FlushMode.SYNC) is OK

# Dump an empty lib
emptyDump = await redis_client.function_dump()
assert emptyDump is not None and len(emptyDump) > 0

name1 = f"Foster{get_random_string(5)}"
name2 = f"Dogster{get_random_string(5)}"

# function name1 returns first argument; function name2 returns argument array len
code = generate_lua_lib_code(
name1, {name1: "return args[1]", name2: "return #args"}, False
)
assert await redis_client.function_load(code, True) == name1.encode()
flist = await redis_client.function_list(with_code=True)

dump = await redis_client.function_dump()
assert dump is not None

# restore without cleaning the lib and/or overwrite option causes an error
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(dump)
assert "already exists" in str(e)

# APPEND policy also fails for the same reason (name collision)
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(
dump, FunctionRestorePolicy.APPEND
)
assert "already exists" in str(e)

# REPLACE policy succeed
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
is OK
)

# but nothing changed - all code overwritten
assert await redis_client.function_list(with_code=True) == flist

# create lib with another name, but with the same function names
assert await redis_client.function_flush(FlushMode.SYNC) is OK
code = generate_lua_lib_code(
name2, {name1: "return args[1]", name2: "return #args"}, False
)
assert await redis_client.function_load(code, True) == name2.encode()

# REPLACE policy now fails due to a name collision
with pytest.raises(RequestError) as e:
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
assert "already exists" in str(e)

# FLUSH policy succeeds, but deletes the second lib
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK
)
assert await redis_client.function_list(with_code=True) == flist

# call restored functions
assert (
await redis_client.fcall(name1, arguments=["meow", "woem"])
== "meow".encode()
)
assert await redis_client.fcall(name2, arguments=["meow", "woem"]) == 2

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_dump_restore_cluster(
self, redis_client: GlideClusterClient
):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

assert await redis_client.function_flush(FlushMode.SYNC) is OK

# Dump an empty lib
emptyDump = await redis_client.function_dump()
yipin-chen marked this conversation as resolved.
Show resolved Hide resolved
assert emptyDump is not None and len(emptyDump) > 0

name1 = f"Foster{get_random_string(5)}"
libname1 = f"FosterLib{get_random_string(5)}"
name2 = f"Dogster{get_random_string(5)}"
libname2 = f"DogsterLib{get_random_string(5)}"

# function name1 returns first argument; function name2 returns argument array len
code = generate_lua_lib_code(
libname1, {name1: "return args[1]", name2: "return #args"}, True
)
assert await redis_client.function_load(code, True) == libname1.encode()
flist = await redis_client.function_list(with_code=True)
dump = await redis_client.function_dump(RandomNode())
assert dump is not None and isinstance(dump, bytes)

# restore without cleaning the lib and/or overwrite option causes an error
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(dump)
assert "already exists" in str(e)

# APPEND policy also fails for the same reason (name collision)
with pytest.raises(RequestError) as e:
assert await redis_client.function_restore(
dump, FunctionRestorePolicy.APPEND
)
assert "already exists" in str(e)

# REPLACE policy succeed
assert (
await redis_client.function_restore(
dump, FunctionRestorePolicy.REPLACE, route=AllPrimaries()
)
is OK
)

# but nothing changed - all code overwritten
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname1.encode()

# Note that function ordering may differ across nodes so we can't do a deep equals
assert len(restoredFunctionList[0]["functions".encode()]) == 2

# create lib with another name, but with the same function names
assert await redis_client.function_flush(FlushMode.SYNC) is OK
code = generate_lua_lib_code(
libname2, {name1: "return args[1]", name2: "return #args"}, True
)
assert await redis_client.function_load(code, True) == libname2.encode()
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname2.encode()

# REPLACE policy now fails due to a name collision
with pytest.raises(RequestError) as e:
await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE)
assert "already exists" in str(e)

# FLUSH policy succeeds, but deletes the second lib
assert (
await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK
)
restoredFunctionList = await redis_client.function_list(with_code=True)
assert restoredFunctionList is not None
assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1
assert restoredFunctionList[0]["library_name".encode()] == libname1.encode()

# Note that function ordering may differ across nodes so we can't do a deep equals
assert len(restoredFunctionList[0]["functions".encode()]) == 2

# call restored functions
assert (
await redis_client.fcall_ro(name1, arguments=["meow", "woem"])
== "meow".encode()
)
assert await redis_client.fcall_ro(name2, arguments=["meow", "woem"]) == 2

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_srandmember(self, redis_client: TGlideClient):
Expand Down
Loading