Skip to content

Commit

Permalink
Bump redis-rs + Route Function Stats to all nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <shohame@amazon.com>
  • Loading branch information
shohamazon committed Aug 14, 2024
1 parent ce69945 commit cfad808
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 35 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
* Node: Added PUBSUB * commands ([#2090](https://github.com/valkey-io/valkey-glide/pull/2090))
* Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043))
* Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077)))
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
* Core: Change FUNCTION STATS command to return multi node response for standalone mode

#### Fixes
* Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970))
Expand Down
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ReconnectingConnection {
create_connection(backend, connection_retry_strategy, push_sender).await
}

fn node_address(&self) -> String {
pub(crate) fn node_address(&self) -> String {
self.inner
.backend
.connection_info
Expand Down
18 changes: 17 additions & 1 deletion glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,23 @@ impl StandaloneClient {
Some(ResponsePolicy::CombineMaps) => future::try_join_all(requests)
.await
.and_then(cluster_routing::combine_map_results),
Some(ResponsePolicy::Special) | None => {
Some(ResponsePolicy::Special) => {
// Await all futures and collect results
let results = future::try_join_all(requests).await?;
let map_entries = self
.inner
.nodes
.iter()
.zip(results)
.map(|(node, result)| {
(redis::Value::BulkString(node.node_address().into()), result)
})
.collect();

Ok(Value::Map(map_entries))
}

None => {
// This is our assumption - if there's no coherent way to aggregate the responses, we just collect them in an array, and pass it to the user.
// TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
future::try_join_all(requests).await.map(Value::Array)
Expand Down
12 changes: 6 additions & 6 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
TClusterResponse,
TEncodable,
TFunctionListResponse,
TFunctionStatsResponse,
TFunctionStatsSingleNodeResponse,
TResult,
TSingleNodeRoute,
)
Expand Down Expand Up @@ -587,19 +587,19 @@ async def fcall_ro_route(

async def function_stats(
self, route: Optional[Route] = None
) -> TClusterResponse[TFunctionStatsResponse]:
) -> TClusterResponse[TFunctionStatsSingleNodeResponse]:
"""
Returns information about the function that's currently running and information about the
available execution engines.
See https://valkey.io/commands/function-stats/ for more details
Args:
route (Optional[Route]): Specifies the routing configuration for the command. The client
will route the command to the nodes defined by `route`.
route (Optional[Route]):The command will be routed automatically to all nodes, unless `route` is provided, in which
case the client will route the command to the nodes defined by `route`. Defaults to None.
Returns:
TClusterResponse[TFunctionStatsResponse]: A `Mapping` with two keys:
TClusterResponse[TFunctionStatsSingleNodeResponse]: A `Mapping` with two keys:
- `running_script` with information about the running script.
- `engines` with information about available engines and their stats.
See example for more details.
Expand All @@ -623,7 +623,7 @@ async def function_stats(
Since: Valkey version 7.0.0.
"""
return cast(
TClusterResponse[TFunctionStatsResponse],
TClusterResponse[TFunctionStatsSingleNodeResponse],
await self._execute_command(RequestType.FunctionStats, [], route),
)

Expand Down
25 changes: 19 additions & 6 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
TOK,
TEncodable,
TFunctionListResponse,
TFunctionStatsResponse,
TFunctionStatsFullResponse,
TResult,
)
from glide.protobuf.command_request_pb2 import RequestType
Expand Down Expand Up @@ -390,22 +390,22 @@ async def function_kill(self) -> TOK:
await self._execute_command(RequestType.FunctionKill, []),
)

async def function_stats(self) -> TFunctionStatsResponse:
async def function_stats(self) -> TFunctionStatsFullResponse:
"""
Returns information about the function that's currently running and information about the
available execution engines.
See https://valkey.io/commands/function-stats/ for more details
Returns:
TFunctionStatsResponse: A `Mapping` with two keys:
TFunctionStatsFullResponse: A Map where the key is the node adrress anf the value is a Map of two keys:
- `running_script` with information about the running script.
- `engines` with information about available engines and their stats.
See example for more details.
Examples:
>>> await client.function_stats()
{
{b"addr": {
'running_script': {
'name': 'foo',
'command': ['FCALL', 'foo', '0', 'hello'],
Expand All @@ -417,12 +417,25 @@ async def function_stats(self) -> TFunctionStatsResponse:
'functions_count': 1,
}
}
}
},
b"addr2": {
'running_script': {
'name': 'foo',
'command': ['FCALL', 'foo', '0', 'hello'],
'duration_ms': 7758
},
'engines': {
'LUA': {
'libraries_count': 1,
'functions_count': 1,
}
}
}}
Since: Valkey version 7.0.0.
"""
return cast(
TFunctionStatsResponse,
TFunctionStatsFullResponse,
await self._execute_command(RequestType.FunctionStats, []),
)

Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ def function_stats(self: TTransaction) -> TTransaction:
See https://valkey.io/commands/function-stats/ for more details
Command Response:
TFunctionStatsResponse: A `Mapping` with two keys:
TFunctionStatsSingleNodeResponse: A `Mapping` with two keys:
- `running_script` with information about the running script.
- `engines` with information about available engines and their stats.
See example for more details.
Expand Down
10 changes: 8 additions & 2 deletions python/python/glide/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@
Union[bytes, List[Mapping[bytes, Union[bytes, Set[bytes]]]]],
]
]
TFunctionStatsResponse = Mapping[
TFunctionStatsSingleNodeResponse = Mapping[
bytes,
Union[
None,
Mapping[
bytes, Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]]
bytes,
Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]],
],
],
]
TFunctionStatsFullResponse = Mapping[
bytes,
TFunctionStatsSingleNodeResponse,
]


TXInfoStreamResponse = Mapping[
bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]]
Expand Down
41 changes: 28 additions & 13 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
ProtocolVersion,
ServerCredentials,
)
from glide.constants import OK, TEncodable, TFunctionStatsResponse, TResult
from glide.constants import OK, TEncodable, TFunctionStatsSingleNodeResponse, TResult
from glide.exceptions import TimeoutError as GlideTimeoutError
from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient
from glide.routes import (
Expand Down Expand Up @@ -7346,6 +7346,11 @@ async def test_bitop(self, glide_client: TGlideClient):
with pytest.raises(RequestError):
await glide_client.bitop(BitwiseOperation.AND, destination, [set_key])

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_shoham(self, glide_client: GlideClusterClient):
print(await glide_client.function_stats(RandomNode()))

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bitfield(self, glide_client: TGlideClient):
Expand Down Expand Up @@ -8222,7 +8227,10 @@ async def test_function_stats(self, glide_client: GlideClient):
assert await glide_client.function_load(code, True) == lib_name.encode()

response = await glide_client.function_stats()
check_function_stats_response(response, [], 1, 1)
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1
)

code = generate_lua_lib_code(
lib_name + "_2",
Expand All @@ -8234,12 +8242,18 @@ async def test_function_stats(self, glide_client: GlideClient):
)

response = await glide_client.function_stats()
check_function_stats_response(response, [], 2, 3)
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3
)

assert await glide_client.function_flush(FlushMode.SYNC) == OK

response = await glide_client.function_stats()
check_function_stats_response(response, [], 0, 0)
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0
)

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
Expand All @@ -8259,7 +8273,7 @@ async def test_function_stats_cluster(self, glide_client: GlideClusterClient):
response = await glide_client.function_stats()
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 1, 1
cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1
)

code = generate_lua_lib_code(
Expand All @@ -8274,15 +8288,15 @@ async def test_function_stats_cluster(self, glide_client: GlideClusterClient):
response = await glide_client.function_stats()
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 2, 3
cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3
)

assert await glide_client.function_flush(FlushMode.SYNC) == OK

response = await glide_client.function_stats()
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 0, 0
cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0
)

@pytest.mark.parametrize("cluster_mode", [True])
Expand Down Expand Up @@ -8311,12 +8325,12 @@ async def test_function_stats_with_routing(
response = await glide_client.function_stats(route)
if single_route:
check_function_stats_response(
cast(TFunctionStatsResponse, response), [], 1, 1
cast(TFunctionStatsSingleNodeResponse, response), [], 1, 1
)
else:
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 1, 1
cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1
)

code = generate_lua_lib_code(
Expand All @@ -8332,25 +8346,25 @@ async def test_function_stats_with_routing(
response = await glide_client.function_stats(route)
if single_route:
check_function_stats_response(
cast(TFunctionStatsResponse, response), [], 2, 3
cast(TFunctionStatsSingleNodeResponse, response), [], 2, 3
)
else:
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 2, 3
cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3
)

assert await glide_client.function_flush(FlushMode.SYNC, route) == OK

response = await glide_client.function_stats(route)
if single_route:
check_function_stats_response(
cast(TFunctionStatsResponse, response), [], 0, 0
cast(TFunctionStatsSingleNodeResponse, response), [], 0, 0
)
else:
for node_response in response.values():
check_function_stats_response(
cast(TFunctionStatsResponse, node_response), [], 0, 0
cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0
)

@pytest.mark.parametrize("cluster_mode", [True, False])
Expand Down Expand Up @@ -10234,3 +10248,4 @@ async def test_script_large_keys_and_args(self, request, cluster_mode, protocol)
== key.encode()
)
await glide_client.close()
await glide_client.close()
6 changes: 3 additions & 3 deletions python/python/tests/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from glide.constants import (
TClusterResponse,
TFunctionListResponse,
TFunctionStatsResponse,
TFunctionStatsSingleNodeResponse,
TResult,
)
from glide.glide_client import TGlideClient
Expand Down Expand Up @@ -309,7 +309,7 @@ def check_function_list_response(


def check_function_stats_response(
response: TFunctionStatsResponse,
response: TFunctionStatsSingleNodeResponse,
running_function: List[bytes],
lib_count: int,
function_count: int,
Expand All @@ -318,7 +318,7 @@ def check_function_stats_response(
Validate whether `FUNCTION STATS` response contains required info.
Args:
response (TFunctionStatsResponse): The response from server.
response (TFunctionStatsSingleNodeResponse): The response from server.
running_function (List[bytes]): Command line of running function expected. Empty, if nothing expected.
lib_count (int): Expected libraries count.
function_count (int): Expected functions count.
Expand Down

0 comments on commit cfad808

Please sign in to comment.