Skip to content

Commit

Permalink
Java: Refactor CommandManager.java and remove Command.java (#916)
Browse files Browse the repository at this point in the history
* Java: Refactor command_manager and remove command

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

* Update commandmanager to remove optional<route> argument

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

* Updates for review comments

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

---------

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
acarbonetto authored Feb 8, 2024
1 parent e3f935a commit fa8403e
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 219 deletions.
28 changes: 14 additions & 14 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ public abstract class BaseClient implements AutoCloseable {
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response as an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}

/**
* Async request for an async (non-blocking) Redis client.
*
Expand Down Expand Up @@ -74,8 +62,8 @@ protected static <T> CompletableFuture<T> CreateClient(
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
*
* <p>see: <a
* href="https://docs.oracle.com/javase/8/docs/api/java/lang/AutoCloseable.html#close--">AutoCloseable::close()</a>
* @see <a
* href="https://docs.oracle.com/javase/8/docs/api/java/lang/AutoCloseable.html#close--">AutoCloseable::close()</a>
*/
@Override
public void close() throws ExecutionException {
Expand All @@ -100,4 +88,16 @@ protected static ConnectionManager buildConnectionManager(ChannelHandler channel
protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response as an <code>Object</code>.
*
* @param response Redis protobuf message
* @return Response <code>Object</code>
*/
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}
}
7 changes: 3 additions & 4 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static redis_request.RedisRequestOuterClass.RequestType.CustomCommand;

import glide.api.commands.BaseCommands;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -30,8 +31,6 @@ public static CompletableFuture<RedisClient> CreateClient(RedisClientConfigurati

@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, this::handleObjectResponse);
return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectResponse);
}
}
18 changes: 6 additions & 12 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static redis_request.RedisRequestOuterClass.RequestType.CustomCommand;

import glide.api.commands.ClusterBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand All @@ -34,25 +35,18 @@ public static CompletableFuture<RedisClusterClient> CreateClient(

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
// TODO if a command returns a map as a single value, ClusterValue misleads user
return commandManager.submitNewCommand(
command, response -> ClusterValue.of(handleObjectResponse(response)));
CustomCommand, args, response -> ClusterValue.of(handleObjectResponse(response)));
}

@Override
@SuppressWarnings("unchecked")
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) {
Command command =
Command.builder()
.requestType(Command.RequestType.CUSTOM_COMMAND)
.arguments(args)
.route(route)
.build();

return commandManager.submitNewCommand(
command,
CustomCommand,
args,
route,
response ->
route.isSingleNodeRoute()
? ClusterValue.ofSingleValue(handleObjectResponse(response))
Expand Down
173 changes: 94 additions & 79 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import glide.api.models.exceptions.ClosingException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.models.Command;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass;
import redis_request.RedisRequestOuterClass.Command;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;
import redis_request.RedisRequestOuterClass.RedisRequest;
import redis_request.RedisRequestOuterClass.RequestType;
Expand All @@ -34,135 +33,151 @@ public class CommandManager {
/**
* Build a command and send.
*
* @param command The command to execute
* @param requestType Redis command type
* @param arguments Redis command arguments
* @param responseHandler The handler for the response object
* @return A result promise of type T
*/
public <T> CompletableFuture<T> submitNewCommand(
Command command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
// write command request to channel
// when complete, convert the response to our expected type T using the given responseHandler
return channel
.write(
prepareRedisRequest(
command.getRequestType(),
command.getArguments(),
Optional.ofNullable(command.getRoute())),
true)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(responseHandler::apply);
RedisRequestOuterClass.RequestType requestType,
String[] arguments,
RedisExceptionCheckedFunction<Response, T> responseHandler) {

RedisRequest.Builder command = prepareRedisRequest(requestType, arguments);
return submitCommandToChannel(command, responseHandler);
}

/**
* Exception handler for future pipeline.
* Build a command and send.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it rethrows the exception
* @param requestType Redis command type
* @param arguments Redis command arguments
* @param route Command routing parameters
* @param responseHandler The handler for the response object
* @return A result promise of type T
*/
private Response exceptionHandler(Throwable e) {
if (e instanceof ClosingException) {
channel.close();
}
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
public <T> CompletableFuture<T> submitNewCommand(
RequestType requestType,
String[] arguments,
Route route,
RedisExceptionCheckedFunction<Response, T> responseHandler) {

RedisRequest.Builder command = prepareRedisRequest(requestType, arguments, route);
return submitCommandToChannel(command, responseHandler);
}

/**
* Take a redis request and send to channel.
*
* @param command The Redis command request as a builder to execute
* @param responseHandler The handler for the response object
* @return A result promise of type T
*/
protected <T> CompletableFuture<T> submitCommandToChannel(
RedisRequest.Builder command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
// write command request to channel
// when complete, convert the response to our expected type T using the given responseHandler
return channel
.write(command, true)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(responseHandler::apply);
}

/**
* Build a protobuf command/transaction request object.<br>
* Used by {@link CommandManager}.
* Build a protobuf command request object with routing options.
*
* @param command - Redis command
* @param args - Redis command arguments as string array
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a
* callback id.
* @param requestType Redis command type
* @param arguments Redis command arguments
* @param route Command routing parameters
* @return An incomplete request. {@link CallbackDispatcher} is responsible to complete it by
* adding a callback id.
*/
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest(
Command.RequestType command, String[] args) {
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs =
RedisRequestOuterClass.Command.ArgsArray.newBuilder();
for (var arg : args) {
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, String[] arguments, Route route) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(arg);
}

// TODO: set route properly when no RouteOptions given
return RedisRequestOuterClass.RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build())
.setRoute(
RedisRequestOuterClass.Routes.newBuilder()
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes)
.build());
}
var builder =
RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());

private RequestType mapRequestTypes(Command.RequestType inType) {
switch (inType) {
case CUSTOM_COMMAND:
return RequestType.CustomCommand;
}
throw new RuntimeException("Unsupported request type");
return prepareRedisRequestRoute(builder, route);
}

/**
* Build a protobuf command/transaction request object with routing options.<br>
* Used by {@link CommandManager}.
* Build a protobuf command request object.
*
* @param command Redis command type
* @param args Redis command arguments
* @param route Command routing parameters
* @param requestType Redis command type
* @param arguments Redis command arguments
* @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by
* adding a callback id.
*/
private RedisRequest.Builder prepareRedisRequest(
Command.RequestType command, String[] args, Optional<Route> route) {
protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, String[] arguments) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : args) {
for (var arg : arguments) {
commandArgs.addArgs(arg);
}

var builder =
RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());

if (route.isEmpty()) {
return builder;
}
return builder;
}

private RedisRequest.Builder prepareRedisRequestRoute(RedisRequest.Builder builder, Route route) {

if (route.get() instanceof SimpleRoute) {
if (route instanceof SimpleRoute) {
builder.setRoute(
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.forNumber(((SimpleRoute) route.get()).ordinal()))
.setSimpleRoutes(SimpleRoutes.forNumber(((SimpleRoute) route).ordinal()))
.build());
} else if (route.get() instanceof SlotIdRoute) {
} else if (route instanceof SlotIdRoute) {
builder.setRoute(
Routes.newBuilder()
.setSlotIdRoute(
RedisRequestOuterClass.SlotIdRoute.newBuilder()
.setSlotId(((SlotIdRoute) route.get()).getSlotId())
.setSlotId(((SlotIdRoute) route).getSlotId())
.setSlotType(
SlotTypes.forNumber(
((SlotIdRoute) route.get()).getSlotType().ordinal()))));
} else if (route.get() instanceof SlotKeyRoute) {
SlotTypes.forNumber(((SlotIdRoute) route).getSlotType().ordinal()))));
} else if (route instanceof SlotKeyRoute) {
builder.setRoute(
Routes.newBuilder()
.setSlotKeyRoute(
RedisRequestOuterClass.SlotKeyRoute.newBuilder()
.setSlotKey(((SlotKeyRoute) route.get()).getSlotKey())
.setSlotKey(((SlotKeyRoute) route).getSlotKey())
.setSlotType(
SlotTypes.forNumber(
((SlotKeyRoute) route.get()).getSlotType().ordinal()))));
SlotTypes.forNumber(((SlotKeyRoute) route).getSlotType().ordinal()))));
} else {
throw new IllegalArgumentException("Unknown type of route");
}
return builder;
}

/**
* Exception handler for future pipeline.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it rethrows the exception
*/
private Response exceptionHandler(Throwable e) {
if (e instanceof ClosingException) {
channel.close();
}
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}
29 changes: 0 additions & 29 deletions java/client/src/main/java/glide/managers/models/Command.java

This file was deleted.

Loading

0 comments on commit fa8403e

Please sign in to comment.