Skip to content

Commit

Permalink
Bump redis-rs + Route Function Stats to all nodes (#2117)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Shoham Elias <shohame@amazon.com>
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Signed-off-by: Shoham Elias <116083498+shohamazon@users.noreply.github.com>
Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
3 people authored Aug 20, 2024
1 parent d91fbab commit 45001ed
Show file tree
Hide file tree
Showing 25 changed files with 328 additions and 223 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@
* Node: Added PUBSUB * commands ([#2090](https://github.com/valkey-io/valkey-glide/pull/2090))
* Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043))
* Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077](https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
* Core: Change FUNCTION STATS command to return multi node response for standalone mode ([#2117](https://github.com/valkey-io/valkey-glide/pull/2117))

#### Fixes
* Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970))
Expand Down
2 changes: 1 addition & 1 deletion glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ReconnectingConnection {
create_connection(backend, connection_retry_strategy, push_sender).await
}

fn node_address(&self) -> String {
pub(crate) fn node_address(&self) -> String {
self.inner
.backend
.connection_info
Expand Down
17 changes: 16 additions & 1 deletion glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,22 @@ impl StandaloneClient {
Some(ResponsePolicy::CombineMaps) => future::try_join_all(requests)
.await
.and_then(cluster_routing::combine_map_results),
Some(ResponsePolicy::Special) | None => {
Some(ResponsePolicy::Special) => {
// Await all futures and collect results
let results = future::try_join_all(requests).await?;
// Create key-value pairs where the key is the node address and the value is the corresponding result
let node_result_pairs = self
.inner
.nodes
.iter()
.zip(results)
.map(|(node, result)| (Value::BulkString(node.node_address().into()), result))
.collect();

Ok(Value::Map(node_result_pairs))
}

None => {
// This is our assumption - if there's no coherent way to aggregate the responses, we just collect them in an array, and pass it to the user.
// TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
future::try_join_all(requests).await.map(Value::Array)
Expand Down
35 changes: 33 additions & 2 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.Script;
Expand Down Expand Up @@ -696,7 +697,7 @@ protected Map<GlideString, Object>[] handleFunctionListResponseBinary(Object[] r
return data;
}

/** Process a <code>FUNCTION STATS</code> standalone response. */
/** Process a <code>FUNCTION STATS</code> response from one node. */
protected Map<String, Map<String, Object>> handleFunctionStatsResponse(
Map<String, Map<String, Object>> response) {
Map<String, Object> runningScriptInfo = response.get("running_script");
Expand All @@ -707,7 +708,7 @@ protected Map<String, Map<String, Object>> handleFunctionStatsResponse(
return response;
}

/** Process a <code>FUNCTION STATS</code> standalone response. */
/** Process a <code>FUNCTION STATS</code> response from one node. */
protected Map<GlideString, Map<GlideString, Object>> handleFunctionStatsBinaryResponse(
Map<GlideString, Map<GlideString, Object>> response) {
Map<GlideString, Object> runningScriptInfo = response.get(gs("running_script"));
Expand All @@ -718,6 +719,36 @@ protected Map<GlideString, Map<GlideString, Object>> handleFunctionStatsBinaryRe
return response;
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<String, Map<String, Object>>> handleFunctionStatsResponse(
Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response)));
} else {
Map<String, Map<String, Map<String, Object>>> data = handleMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValue(data);
}
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<GlideString, Map<GlideString, Object>>>
handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(
handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
} else {
Map<GlideString, Map<GlideString, Map<GlideString, Object>>> data =
handleBinaryStringMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValueBinary(data);
}
}

/** Process a <code>LCS key1 key2 IDX</code> response */
protected Map<String, Object> handleLcsIdxResponse(Map<String, Object> response)
throws GlideException {
Expand Down
9 changes: 5 additions & 4 deletions java/client/src/main/java/glide/api/GlideClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,20 @@ public CompletableFuture<String> functionKill() {
}

@Override
public CompletableFuture<Map<String, Map<String, Object>>> functionStats() {
public CompletableFuture<Map<String, Map<String, Map<String, Object>>>> functionStats() {
return commandManager.submitNewCommand(
FunctionStats,
new String[0],
response -> handleFunctionStatsResponse(handleMapResponse(response)));
response -> handleFunctionStatsResponse(response, false).getMultiValue());
}

@Override
public CompletableFuture<Map<GlideString, Map<GlideString, Object>>> functionStatsBinary() {
public CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>>
functionStatsBinary() {
return commandManager.submitNewCommand(
FunctionStats,
new GlideString[0],
response -> handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
response -> handleFunctionStatsBinaryResponse(response, false).getMultiValue());
}

@Override
Expand Down
30 changes: 0 additions & 30 deletions java/client/src/main/java/glide/api/GlideClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,36 +946,6 @@ public CompletableFuture<String> functionKill(@NonNull Route route) {
FunctionKill, new String[0], route, this::handleStringResponse);
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<String, Map<String, Object>>> handleFunctionStatsResponse(
Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(handleFunctionStatsResponse(handleMapResponse(response)));
} else {
Map<String, Map<String, Map<String, Object>>> data = handleMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValue(data);
}
}

/** Process a <code>FUNCTION STATS</code> cluster response. */
protected ClusterValue<Map<GlideString, Map<GlideString, Object>>>
handleFunctionStatsBinaryResponse(Response response, boolean isSingleValue) {
if (isSingleValue) {
return ClusterValue.ofSingleValue(
handleFunctionStatsBinaryResponse(handleBinaryStringMapResponse(response)));
} else {
Map<GlideString, Map<GlideString, Map<GlideString, Object>>> data =
handleBinaryStringMapResponse(response);
for (var nodeInfo : data.entrySet()) {
nodeInfo.setValue(handleFunctionStatsBinaryResponse(nodeInfo.getValue()));
}
return ClusterValue.ofMultiValueBinary(data);
}
}

@Override
public CompletableFuture<ClusterValue<Map<String, Map<String, Object>>>> functionStats() {
return commandManager.submitNewCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ CompletableFuture<ClusterValue<Object>> fcallReadOnly(
/**
* Kills a function that is currently executing.<br>
* <code>FUNCTION KILL</code> terminates read-only functions only.<br>
* The command will be routed to all primary nodes.
* The command will be routed to all nodes.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-kill/">valkey.io</a> for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ CompletableFuture<Map<GlideString, Object>[]> functionListBinary(

/**
* Kills a function that is currently executing.<br>
* <code>FUNCTION KILL</code> terminates read-only functions only.
* <code>FUNCTION KILL</code> terminates read-only functions only. <code>FUNCTION KILL</code> runs
* on all nodes of the server, including primary and replicas.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-kill/">valkey.io</a> for details.
Expand All @@ -343,63 +344,73 @@ CompletableFuture<Map<GlideString, Object>[]> functionListBinary(

/**
* Returns information about the function that's currently running and information about the
* available execution engines.
* available execution engines.<br>
* <code>FUNCTION STATS</code> runs on all nodes of the server, including primary and replicas.
* The response includes a mapping from node address to the command response for that node.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-stats/">valkey.io</a> for details.
* @return A <code>Map</code> with two keys:
* @return A <code>Map</code> from node address to the command response for that node, where the
* command contains a <code>Map</code> with two keys:
* <ul>
* <li><code>running_script</code> with information about the running script.
* <li><code>engines</code> with information about available engines and their stats.
* </ul>
* See example for more details.
* @example
* <pre>{@code
* Map<String, Map<String, Object>> response = client.functionStats().get();
* Map<String, Object> runningScriptInfo = response.get("running_script");
* if (runningScriptInfo != null) {
* String[] commandLine = (String[]) runningScriptInfo.get("command");
* System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
* }
* Map<String, Object> enginesInfo = response.get("engines");
* for (String engineName : enginesInfo.keySet()) {
* Map<String, Long> engine = (Map<String, Long>) enginesInfo.get(engineName);
* System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
* engineName, engine.get("libraries_count"), engine.get("functions_count"));
* Map<String, Map<String, Map<String, Object>>> response = client.functionStats().get();
* for (String node : response.keySet()) {
* Map<String, Object> runningScriptInfo = response.get(node).get("running_script");
* if (runningScriptInfo != null) {
* String[] commandLine = (String[]) runningScriptInfo.get("command");
* System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* node, runningScriptInfo.get("name"), String.join(" ", commandLine), (long)runningScriptInfo.get("duration_ms"));
* }
* Map<String, Object> enginesInfo = response.get(node).get("engines");
* for (String engineName : enginesInfo.keySet()) {
* Map<String, Long> engine = (Map<String, Long>) enginesInfo.get(engineName);
* System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
* node, engineName, engine.get("libraries_count"), engine.get("functions_count"));
* }
* }
* }</pre>
*/
CompletableFuture<Map<String, Map<String, Object>>> functionStats();
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> functionStats();

/**
* Returns information about the function that's currently running and information about the
* available execution engines.
* available execution engines.<br>
* <code>FUNCTION STATS</code> runs on all nodes of the server, including primary and replicas.
* The response includes a mapping from node address to the command response for that node.
*
* @since Valkey 7.0 and above.
* @see <a href="https://valkey.io/commands/function-stats/">valkey.io</a> for details.
* @return A <code>Map</code> with two keys:
* @return A <code>Map</code> from node address to the command response for that node, where the
* command contains a <code>Map</code> with two keys:
* <ul>
* <li><code>running_script</code> with information about the running script.
* <li><code>engines</code> with information about available engines and their stats.
* </ul>
* See example for more details.
* @example
* <pre>{@code
* Map<GlideString, Map<GlideString, Object>> response = client.functionStats().get();
* Map<GlideString, Object> runningScriptInfo = response.get(gs("running_script"));
* if (runningScriptInfo != null) {
* GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
* System.out.printf("Server is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
* }
* Map<GlideString, Object> enginesInfo = response.get(gs("engines"));
* for (GlideString engineName : enginesInfo.keySet()) {
* Map<GlideString, Long> engine = (Map<GlideString, Long>) enginesInfo.get(gs(engineName));
* System.out.printf("Server supports engine '%s', which has %d libraries and %d functions in total%n",
* engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
* Map<GlideString, Map<GlideString, Map<GlideString, Object>>> response = client.functionStats().get();
* for (String node : response.keySet()) {
* Map<GlideString, Object> runningScriptInfo = response.get(gs(node)).get(gs("running_script"));
* if (runningScriptInfo != null) {
* GlideString[] commandLine = (GlideString[]) runningScriptInfo.get(gs("command"));
* System.out.printf("Node '%s' is currently running function '%s' with command line '%s', which has been running for %d ms%n",
* node, runningScriptInfo.get(gs("name")), String.join(" ", Arrays.toString(commandLine)), (long)runningScriptInfo.get(gs("duration_ms")));
* }
* Map<GlideString, Object> enginesInfo = response.get(gs(node)).get(gs("engines"));
* for (GlideString engineName : enginesInfo.keySet()) {
* Map<GlideString, Long> engine = (Map<GlideString, Long>) enginesInfo.get(gs(engineName));
* System.out.printf("Node '%s' supports engine '%s', which has %d libraries and %d functions in total%n",
* node, engineName, engine.get(gs("libraries_count")), engine.get(gs("functions_count")));
* }
* }
* }</pre>
*/
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> functionStatsBinary();
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> functionStatsBinary();
}
24 changes: 14 additions & 10 deletions java/client/src/test/java/glide/api/GlideClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11485,18 +11485,21 @@ public void functionKill_returns_success() {
public void functionStats_returns_success() {
// setup
String[] args = new String[0];
Map<String, Map<String, Object>> value = Map.of("1", Map.of("2", 2));
CompletableFuture<Map<String, Map<String, Object>>> testResponse = new CompletableFuture<>();
Map<String, Map<String, Map<String, Object>>> value =
Map.of("::1", Map.of("1", Map.of("2", 2)));
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> testResponse =
new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<String, Map<String, Object>>>submitNewCommand(
when(commandManager.<Map<String, Map<String, Map<String, Object>>>>submitNewCommand(
eq(FunctionStats), eq(args), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<String, Map<String, Object>>> response = service.functionStats();
Map<String, Map<String, Object>> payload = response.get();
CompletableFuture<Map<String, Map<String, Map<String, Object>>>> response =
service.functionStats();
Map<String, Map<String, Map<String, Object>>> payload = response.get();

// verify
assertEquals(testResponse, response);
Expand All @@ -11508,20 +11511,21 @@ public void functionStats_returns_success() {
public void functionStatsBinary_returns_success() {
// setup
GlideString[] args = new GlideString[0];
Map<GlideString, Map<GlideString, Object>> value = Map.of(gs("1"), Map.of(gs("2"), 2));
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> testResponse =
Map<String, Map<GlideString, Map<GlideString, Object>>> value =
Map.of("::1", Map.of(gs("1"), Map.of(gs("2"), 2)));
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> testResponse =
new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<Map<GlideString, Map<GlideString, Object>>>submitNewCommand(
when(commandManager.<Map<String, Map<GlideString, Map<GlideString, Object>>>>submitNewCommand(
eq(FunctionStats), eq(args), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<GlideString, Map<GlideString, Object>>> response =
CompletableFuture<Map<String, Map<GlideString, Map<GlideString, Object>>>> response =
service.functionStatsBinary();
Map<GlideString, Map<GlideString, Object>> payload = response.get();
Map<String, Map<GlideString, Map<GlideString, Object>>> payload = response.get();

// verify
assertEquals(testResponse, response);
Expand Down
Loading

0 comments on commit 45001ed

Please sign in to comment.