Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Changed handling of large requests to transfer them as leaked pointers #1708

Merged
merged 23 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ logger_core = {path = "../logger_core"}
tracing-subscriber = "0.3.16"
jni = "0.21.1"
log = "0.4.20"
bytes = { version = "1.6.0" }

[profile.release]
lto = true
Expand Down
437 changes: 218 additions & 219 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
import redis_request.RedisRequestOuterClass;

/**
* Extends BaseTransaction class for cluster mode commands. Transactions allow the execution of a
Expand Down Expand Up @@ -50,9 +49,8 @@ protected ClusterTransaction getThis() {
*/
public ClusterTransaction sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(sortClusterOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(Sort, ArrayUtils.addFirst(sortClusterOptions.toArgs(), key)));
return this;
}

Expand All @@ -69,9 +67,8 @@ public ClusterTransaction sort(
*/
public ClusterTransaction sortReadOnly(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(sortClusterOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(SortReadOnly, commandArgs));
protobufTransaction.addCommands(
buildCommand(SortReadOnly, ArrayUtils.addFirst(sortClusterOptions.toArgs(), key)));
return this;
}

Expand All @@ -94,10 +91,10 @@ public ClusterTransaction sortStore(
@NonNull String destination,
@NonNull SortClusterOptions sortClusterOptions) {
String[] storeArguments = new String[] {STORE_COMMAND_STRING, destination};
RedisRequestOuterClass.Command.ArgsArray commandArgs =
buildArgs(
concatenateArrays(new String[] {key}, sortClusterOptions.toArgs(), storeArguments));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(
Sort,
concatenateArrays(new String[] {key}, sortClusterOptions.toArgs(), storeArguments)));
return this;
}
}
25 changes: 10 additions & 15 deletions java/client/src/main/java/glide/api/models/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;

/**
* Extends BaseTransaction class for Redis standalone commands. Transactions allow the execution of
Expand Down Expand Up @@ -53,9 +52,7 @@ protected Transaction getThis() {
* @return Command Response - A simple <code>OK</code> response.
*/
public Transaction select(long index) {
ArgsArray commandArgs = buildArgs(Long.toString(index));

protobufTransaction.addCommands(buildCommand(Select, commandArgs));
protobufTransaction.addCommands(buildCommand(Select, Long.toString(index)));
return this;
}

Expand All @@ -71,8 +68,7 @@ public Transaction select(long index) {
* exist in the source database.
*/
public Transaction move(String key, long dbIndex) {
ArgsArray commandArgs = buildArgs(key, Long.toString(dbIndex));
protobufTransaction.addCommands(buildCommand(Move, commandArgs));
protobufTransaction.addCommands(buildCommand(Move, key, Long.toString(dbIndex)));
return this;
}

Expand Down Expand Up @@ -113,8 +109,7 @@ public Transaction copy(
if (replace) {
args = ArrayUtils.add(args, REPLACE_REDIS_API);
}
ArgsArray commandArgs = buildArgs(args);
protobufTransaction.addCommands(buildCommand(Copy, commandArgs));
protobufTransaction.addCommands(buildCommand(Copy, args));
return this;
}

Expand All @@ -129,8 +124,8 @@ public Transaction copy(
* @return Command Response - An <code>Array</code> of sorted elements.
*/
public Transaction sort(@NonNull String key, @NonNull SortOptions sortOptions) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(sortOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(Sort, commandArgs));
protobufTransaction.addCommands(
buildCommand(Sort, ArrayUtils.addFirst(sortOptions.toArgs(), key)));
return this;
}

Expand All @@ -145,8 +140,8 @@ public Transaction sort(@NonNull String key, @NonNull SortOptions sortOptions) {
* @return Command Response - An <code>Array</code> of sorted elements.
*/
public Transaction sortReadOnly(@NonNull String key, @NonNull SortOptions sortOptions) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(sortOptions.toArgs(), key));
protobufTransaction.addCommands(buildCommand(SortReadOnly, commandArgs));
protobufTransaction.addCommands(
buildCommand(SortReadOnly, ArrayUtils.addFirst(sortOptions.toArgs(), key)));
return this;
}

Expand All @@ -166,9 +161,9 @@ public Transaction sortReadOnly(@NonNull String key, @NonNull SortOptions sortOp
public Transaction sortStore(
@NonNull String key, @NonNull String destination, @NonNull SortOptions sortOptions) {
String[] storeArguments = new String[] {STORE_COMMAND_STRING, destination};
ArgsArray arguments =
buildArgs(concatenateArrays(new String[] {key}, sortOptions.toArgs(), storeArguments));
protobufTransaction.addCommands(buildCommand(Sort, arguments));
protobufTransaction.addCommands(
buildCommand(
Sort, concatenateArrays(new String[] {key}, sortOptions.toArgs(), storeArguments)));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
import response.ResponseOuterClass.Response;

public class RedisValueResolver {
public static final long MAX_REQUEST_ARGS_LENGTH_IN_BYTES;

// TODO: consider lazy loading the glide_rs library
static {
NativeUtils.loadGlideLib();

// Note: This is derived from a native call instead of hard-coded to ensure consistency
// between Java and native clients.
MAX_REQUEST_ARGS_LENGTH_IN_BYTES = getMaxRequestArgsLengthInBytes();
}

/**
Expand All @@ -26,4 +31,20 @@ public class RedisValueResolver {
* @return A RESP3 value
*/
public static native Object valueFromPointerBinary(long pointer);

/**
* Copy the given array of byte arrays to a native series of byte arrays and return a C-style
* pointer.
*
* @param args The arguments to copy.
* @return A C-style pointer to a native representation of the arguments.
*/
public static native long createLeakedBytesVec(byte[][] args);

/**
* Get the maximum length in bytes of all request arguments.
*
* @return The maximum length in bytes of all request arguments.
*/
private static native long getMaxRequestArgsLengthInBytes();
}
104 changes: 68 additions & 36 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import glide.api.models.exceptions.RequestException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.ffi.resolvers.RedisValueResolver;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass;
import redis_request.RedisRequestOuterClass.Command;
Expand Down Expand Up @@ -198,18 +202,12 @@ protected <T> CompletableFuture<T> submitCommandToChannel(
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, String[] arguments, Route route) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFromUtf8(arg));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

var builder =
RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());

return prepareRedisRequestRoute(builder, route);
}
Expand All @@ -225,18 +223,12 @@ protected RedisRequest.Builder prepareRedisRequest(
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, GlideString[] arguments, Route route) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFrom(arg.getBytes()));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

var builder =
RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());

return prepareRedisRequestRoute(builder, route);
}
Expand Down Expand Up @@ -298,17 +290,11 @@ protected RedisRequest.Builder prepareRedisRequest(
* adding a callback id.
*/
protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, String[] arguments) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFromUtf8(arg));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

return RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());
}

/**
Expand All @@ -321,17 +307,11 @@ protected RedisRequest.Builder prepareRedisRequest(RequestType requestType, Stri
*/
protected RedisRequest.Builder prepareRedisRequest(
RequestType requestType, GlideString[] arguments) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : arguments) {
commandArgs.addArgs(ByteString.copyFrom(arg.getBytes()));
}
final Command.Builder commandBuilder = Command.newBuilder();
populateCommandWithArgs(arguments, commandBuilder);

return RedisRequest.newBuilder()
.setSingleCommand(
Command.newBuilder()
.setRequestType(requestType)
.setArgsArray(commandArgs.build())
.build());
.setSingleCommand(commandBuilder.setRequestType(requestType).build());
}

private RedisRequest.Builder prepareRedisRequestRoute(RedisRequest.Builder builder, Route route) {
Expand Down Expand Up @@ -392,4 +372,56 @@ private Response exceptionHandler(Throwable e) {
}
throw new RuntimeException(e);
}

/**
* Add the given set of arguments to the output Command.Builder.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
public static void populateCommandWithArgs(String[] arguments, Command.Builder outputBuilder) {
populateCommandWithArgs(
Arrays.stream(arguments)
.map(value -> value.getBytes(StandardCharsets.UTF_8))
.collect(Collectors.toList()),
outputBuilder);
}

/**
* Add the given set of arguments to the output Command.Builder.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
private static void populateCommandWithArgs(
GlideString[] arguments, Command.Builder outputBuilder) {
populateCommandWithArgs(
Arrays.stream(arguments).map(GlideString::getBytes).collect(Collectors.toList()),
outputBuilder);
}

/**
* Add the given set of arguments to the output Command.Builder.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>Implementation note: When the length in bytes of all arguments supplied to the given command
* exceed {@link RedisValueResolver#MAX_REQUEST_ARGS_LENGTH_IN_BYTES}, the Command will hold a
* handle to leaked vector of byte arrays in the native layer in the <code>ArgsVecPointer</code>
* field. In the normal case where the command arguments are small, they'll be serialized as to an
* {@link ArgsArray} message.
*
* @param arguments The arguments to add to the builder.
* @param outputBuilder The builder to populate with arguments.
*/
private static void populateCommandWithArgs(
List<byte[]> arguments, Command.Builder outputBuilder) {
final long totalArgSize = arguments.stream().mapToLong(arg -> arg.length).sum();
if (totalArgSize < RedisValueResolver.MAX_REQUEST_ARGS_LENGTH_IN_BYTES) {
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
arguments.forEach(arg -> commandArgs.addArgs(ByteString.copyFrom(arg)));
outputBuilder.setArgsArray(commandArgs);
} else {
outputBuilder.setArgsVecPointer(
RedisValueResolver.createLeakedBytesVec(arguments.toArray(new byte[][] {})));
}
}
}
6 changes: 3 additions & 3 deletions java/integTest/src/test/java/glide/SharedClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public static void teardown() {
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void send_and_receive_large_values(BaseClient client) {
int length = 1 << 16;
String key = getRandomString(length);
String value = getRandomString(length);
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

assertEquals(length, key.length());
assertEquals(length, value.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ public void keyless_transactions_with_group_of_commands(
assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void test_transaction_large_values() {
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

ClusterTransaction transaction = new ClusterTransaction();
transaction.set(key, value);
transaction.get(key);

Object[] expectedResult =
new Object[] {
OK, // transaction.set(key, value);
value, // transaction.get(key);
};

Object[] result = clusterClient.exec(transaction).get();
assertArrayEquals(expectedResult, result);
}

@Test
@SneakyThrows
public void lastsave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ public void keyless_transactions_with_group_of_commands(
assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void test_transaction_large_values() {
int length = 1 << 25; // 33mb
String key = "0".repeat(length);
String value = "0".repeat(length);

Transaction transaction = new Transaction();
transaction.set(key, value);
transaction.get(key);

Object[] expectedResult =
new Object[] {
OK, // transaction.set(key, value);
value, // transaction.get(key);
};

Object[] result = client.exec(transaction).get();
assertArrayEquals(expectedResult, result);
}

@SneakyThrows
@Test
public void test_standalone_transaction() {
Expand Down
2 changes: 1 addition & 1 deletion java/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/
use jni::{errors::Error as JNIError, JNIEnv};
use log::error;
Expand Down
Loading
Loading