-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java: Refactor CommandManager.java and remove Command.java #916
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ | ||
package glide.api; | ||
|
||
import static redis_request.RedisRequestOuterClass.RequestType.CustomCommand; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't heard of that in Java. I will look that one up. |
||
|
||
import glide.api.commands.BaseCommands; | ||
import glide.api.models.configuration.RedisClientConfiguration; | ||
import glide.managers.CommandManager; | ||
import glide.managers.ConnectionManager; | ||
import glide.managers.models.Command; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
|
@@ -30,8 +31,6 @@ public static CompletableFuture<RedisClient> CreateClient(RedisClientConfigurati | |
|
||
@Override | ||
public CompletableFuture<Object> customCommand(String[] args) { | ||
Command command = | ||
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); | ||
return commandManager.submitNewCommand(command, this::handleObjectResponse); | ||
return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectResponse); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,14 @@ | ||
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ | ||
package glide.api; | ||
|
||
import static redis_request.RedisRequestOuterClass.RequestType.CustomCommand; | ||
acarbonetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import glide.api.commands.ClusterBaseCommands; | ||
import glide.api.models.ClusterValue; | ||
import glide.api.models.configuration.RedisClusterClientConfiguration; | ||
import glide.api.models.configuration.RequestRoutingConfiguration.Route; | ||
import glide.managers.CommandManager; | ||
import glide.managers.ConnectionManager; | ||
import glide.managers.models.Command; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
|
@@ -34,25 +35,18 @@ public static CompletableFuture<RedisClusterClient> CreateClient( | |
|
||
@Override | ||
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Future PR to switch this to |
||
Command command = | ||
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); | ||
// TODO if a command returns a map as a single value, ClusterValue misleads user | ||
return commandManager.submitNewCommand( | ||
command, response -> ClusterValue.of(handleObjectResponse(response))); | ||
CustomCommand, args, response -> ClusterValue.of(handleObjectResponse(response))); | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) { | ||
Command command = | ||
Command.builder() | ||
.requestType(Command.RequestType.CUSTOM_COMMAND) | ||
.arguments(args) | ||
.route(route) | ||
.build(); | ||
|
||
return commandManager.submitNewCommand( | ||
command, | ||
CustomCommand, | ||
args, | ||
route, | ||
response -> | ||
route.isSingleNodeRoute() | ||
? ClusterValue.ofSingleValue(handleObjectResponse(response)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,11 +8,11 @@ | |
import glide.api.models.exceptions.ClosingException; | ||
import glide.connectors.handlers.CallbackDispatcher; | ||
import glide.connectors.handlers.ChannelHandler; | ||
import glide.managers.models.Command; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import lombok.RequiredArgsConstructor; | ||
import redis_request.RedisRequestOuterClass; | ||
import redis_request.RedisRequestOuterClass.Command; | ||
import redis_request.RedisRequestOuterClass.Command.ArgsArray; | ||
import redis_request.RedisRequestOuterClass.RedisRequest; | ||
import redis_request.RedisRequestOuterClass.RequestType; | ||
|
@@ -34,105 +34,110 @@ public class CommandManager { | |
/** | ||
* Build a command and send. | ||
* | ||
* @param command The command to execute | ||
* @param requestType Redis command type | ||
* @param arguments Redis command arguments | ||
* @param responseHandler The handler for the response object | ||
* @return A result promise of type T | ||
*/ | ||
public <T> CompletableFuture<T> submitNewCommand( | ||
Command command, RedisExceptionCheckedFunction<Response, T> responseHandler) { | ||
// write command request to channel | ||
// when complete, convert the response to our expected type T using the given responseHandler | ||
return channel | ||
.write( | ||
prepareRedisRequest( | ||
command.getRequestType(), | ||
command.getArguments(), | ||
Optional.ofNullable(command.getRoute())), | ||
true) | ||
.exceptionally(this::exceptionHandler) | ||
.thenApplyAsync(responseHandler::apply); | ||
RedisRequestOuterClass.RequestType requestType, | ||
String[] arguments, | ||
RedisExceptionCheckedFunction<Response, T> responseHandler) { | ||
|
||
RedisRequest.Builder command = prepareRedisRequest(requestType, arguments); | ||
return submitCommandToChannel(command, responseHandler); | ||
} | ||
|
||
/** | ||
* Exception handler for future pipeline. | ||
* Build a command and send. | ||
* | ||
* @param e An exception thrown in the pipeline before | ||
* @return Nothing, it rethrows the exception | ||
* @param requestType Redis command type | ||
* @param arguments Redis command arguments | ||
* @param route Command routing parameters | ||
* @param responseHandler The handler for the response object | ||
* @return A result promise of type T | ||
*/ | ||
private Response exceptionHandler(Throwable e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to the end of the file. |
||
if (e instanceof ClosingException) { | ||
channel.close(); | ||
} | ||
if (e instanceof RuntimeException) { | ||
// RedisException also goes here | ||
throw (RuntimeException) e; | ||
} | ||
throw new RuntimeException(e); | ||
public <T> CompletableFuture<T> submitNewCommand( | ||
RequestType requestType, | ||
String[] arguments, | ||
Route route, | ||
RedisExceptionCheckedFunction<Response, T> responseHandler) { | ||
|
||
RedisRequest.Builder command = prepareRedisRequest(requestType, arguments, route); | ||
return submitCommandToChannel(command, responseHandler); | ||
} | ||
|
||
/** | ||
* Take a redis request and send to channel. | ||
* | ||
* @param command The Redis command request as a builder to execute | ||
* @param responseHandler The handler for the response object | ||
* @return A result promise of type T | ||
*/ | ||
protected <T> CompletableFuture<T> submitCommandToChannel( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed from |
||
RedisRequest.Builder command, RedisExceptionCheckedFunction<Response, T> responseHandler) { | ||
// write command request to channel | ||
// when complete, convert the response to our expected type T using the given responseHandler | ||
return channel | ||
.write(command, true) | ||
.exceptionally(this::exceptionHandler) | ||
.thenApplyAsync(responseHandler::apply); | ||
} | ||
|
||
/** | ||
* Build a protobuf command/transaction request object.<br> | ||
* Used by {@link CommandManager}. | ||
* Build a protobuf command request object with routing options. | ||
* | ||
* @param command - Redis command | ||
* @param args - Redis command arguments as string array | ||
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a | ||
* callback id. | ||
* @param requestType Redis command type | ||
* @param arguments Redis command arguments | ||
* @param route Command routing parameters | ||
* @return An incomplete request. {@link CallbackDispatcher} is responsible to complete it by | ||
* adding a callback id. | ||
*/ | ||
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest( | ||
Command.RequestType command, String[] args) { | ||
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs = | ||
RedisRequestOuterClass.Command.ArgsArray.newBuilder(); | ||
for (var arg : args) { | ||
protected RedisRequest.Builder prepareRedisRequest( | ||
RequestType requestType, String[] arguments, Route route) { | ||
ArgsArray.Builder commandArgs = ArgsArray.newBuilder(); | ||
for (var arg : arguments) { | ||
commandArgs.addArgs(arg); | ||
} | ||
|
||
// TODO: set route properly when no RouteOptions given | ||
return RedisRequestOuterClass.RedisRequest.newBuilder() | ||
.setSingleCommand( | ||
RedisRequestOuterClass.Command.newBuilder() | ||
.setRequestType(mapRequestTypes(command)) | ||
.setArgsArray(commandArgs.build()) | ||
.build()) | ||
.setRoute( | ||
RedisRequestOuterClass.Routes.newBuilder() | ||
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes) | ||
.build()); | ||
} | ||
var builder = | ||
RedisRequest.newBuilder() | ||
.setSingleCommand( | ||
Command.newBuilder() | ||
.setRequestType(requestType) | ||
.setArgsArray(commandArgs.build()) | ||
.build()); | ||
|
||
private RequestType mapRequestTypes(Command.RequestType inType) { | ||
switch (inType) { | ||
case CUSTOM_COMMAND: | ||
return RequestType.CustomCommand; | ||
} | ||
throw new RuntimeException("Unsupported request type"); | ||
return prepareRedisRequestRoute(builder, Optional.of(route)); | ||
} | ||
|
||
/** | ||
* Build a protobuf command/transaction request object with routing options.<br> | ||
* Used by {@link CommandManager}. | ||
* Build a protobuf command request object. | ||
* | ||
* @param command Redis command type | ||
* @param args Redis command arguments | ||
* @param route Command routing parameters | ||
* @param requestType Redis command type | ||
* @param arguments Redis command arguments | ||
* @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by | ||
* adding a callback id. | ||
*/ | ||
private RedisRequest.Builder prepareRedisRequest( | ||
Command.RequestType command, String[] args, Optional<Route> route) { | ||
protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, String[] arguments) { | ||
ArgsArray.Builder commandArgs = ArgsArray.newBuilder(); | ||
for (var arg : args) { | ||
for (var arg : arguments) { | ||
commandArgs.addArgs(arg); | ||
} | ||
|
||
var builder = | ||
RedisRequest.newBuilder() | ||
.setSingleCommand( | ||
RedisRequestOuterClass.Command.newBuilder() | ||
.setRequestType(mapRequestTypes(command)) | ||
Command.newBuilder() | ||
.setRequestType(requestType) | ||
.setArgsArray(commandArgs.build()) | ||
.build()); | ||
|
||
return prepareRedisRequestRoute(builder, Optional.empty()); | ||
acarbonetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private RedisRequest.Builder prepareRedisRequestRoute( | ||
RedisRequest.Builder builder, Optional<Route> route) { | ||
if (route.isEmpty()) { | ||
acarbonetto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return builder; | ||
} | ||
|
@@ -165,4 +170,21 @@ private RedisRequest.Builder prepareRedisRequest( | |
} | ||
return builder; | ||
} | ||
|
||
/** | ||
* Exception handler for future pipeline. | ||
* | ||
* @param e An exception thrown in the pipeline before | ||
* @return Nothing, it rethrows the exception | ||
*/ | ||
private Response exceptionHandler(Throwable e) { | ||
if (e instanceof ClosingException) { | ||
channel.close(); | ||
} | ||
if (e instanceof RuntimeException) { | ||
// RedisException also goes here | ||
throw (RuntimeException) e; | ||
} | ||
throw new RuntimeException(e); | ||
} | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this below. There should be about a dozen handleXResponse functions to be included in upcoming PRs and they should be below the CreateClient calls.