Skip to content

Commit

Permalink
Merge branch 'main' into python/dev_yipin_function_dump_restore
Browse files Browse the repository at this point in the history
  • Loading branch information
yipin-chen authored Jul 3, 2024
2 parents 7ab6264 + 402c258 commit 9f98e66
Show file tree
Hide file tree
Showing 44 changed files with 3,449 additions and 985 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
* Python: Added XINFO GROUPS and XINFO CONSUMERS commands ([#1753](https://github.com/aws/glide-for-redis/pull/1753))
* Python: Added LPOS command ([#1740](https://github.com/aws/glide-for-redis/pull/1740))
* Python: Added SCAN command ([#1623](https://github.com/aws/glide-for-redis/pull/1623))
* Python: Added DUMP and Restore commands ([#1733](https://github.com/aws/glide-for-redis/pull/1733))
* Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751))
* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769))

### Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ enum RequestType {
XAutoClaim = 203;
XInfoGroups = 204;
XInfoConsumers = 205;
Scan = 206;
Wait = 208;
XClaim = 209;
Scan = 210;
}

message Command {
Expand Down
2 changes: 1 addition & 1 deletion glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ pub enum RequestType {
XAutoClaim = 203,
XInfoGroups = 204,
XInfoConsumers = 205,
Scan = 206,
Wait = 208,
XClaim = 209,
Scan = 210,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down
28 changes: 28 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,10 @@ protected Object[] handleArrayResponse(Response response) throws RedisException
return handleRedisResponse(Object[].class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response);
}

protected Object[] handleArrayResponseBinary(Response response) throws RedisException {
return handleRedisResponse(Object[].class, EnumSet.noneOf(ResponseFlags.class), response);
}

protected Object[] handleArrayOrNullResponse(Response response) throws RedisException {
return handleRedisResponse(
Object[].class,
Expand Down Expand Up @@ -1133,6 +1137,12 @@ public CompletableFuture<String> hrandfield(@NonNull String key) {
HRandField, new String[] {key}, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<GlideString> hrandfield(@NonNull GlideString key) {
return commandManager.submitNewCommand(
HRandField, new GlideString[] {key}, this::handleGlideStringOrNullResponse);
}

@Override
public CompletableFuture<String[]> hrandfieldWithCount(@NonNull String key, long count) {
return commandManager.submitNewCommand(
Expand All @@ -1141,6 +1151,15 @@ public CompletableFuture<String[]> hrandfieldWithCount(@NonNull String key, long
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> hrandfieldWithCount(
@NonNull GlideString key, long count) {
return commandManager.submitNewCommand(
HRandField,
new GlideString[] {key, GlideString.of(count)},
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<String[][]> hrandfieldWithCountWithValues(
@NonNull String key, long count) {
Expand All @@ -1150,6 +1169,15 @@ public CompletableFuture<String[][]> hrandfieldWithCountWithValues(
response -> castArrayofArrays(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[][]> hrandfieldWithCountWithValues(
@NonNull GlideString key, long count) {
return commandManager.submitNewCommand(
HRandField,
new GlideString[] {key, GlideString.of(count), GlideString.of(WITH_VALUES_REDIS_API)},
response -> castArrayofArrays(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<Long> lpush(@NonNull String key, @NonNull String[] elements) {
String[] arguments = ArrayUtils.addFirst(elements, key);
Expand Down
13 changes: 13 additions & 0 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Move;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RandomKey;
import static redis_request.RedisRequestOuterClass.RequestType.Scan;
import static redis_request.RedisRequestOuterClass.RequestType.Select;
import static redis_request.RedisRequestOuterClass.RequestType.Sort;
import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly;
Expand All @@ -54,6 +55,7 @@
import glide.api.models.commands.SortOptions;
import glide.api.models.commands.SortOptionsBinary;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.RedisClientConfiguration;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -486,4 +488,15 @@ public CompletableFuture<Long> sortStore(
concatenateArrays(new GlideString[] {key}, sortOptions.toGlideStringArgs(), storeArguments);
return commandManager.submitNewCommand(Sort, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<Object[]> scan(@NonNull String cursor) {
return commandManager.submitNewCommand(Scan, new String[] {cursor}, this::handleArrayResponse);
}

@Override
public CompletableFuture<Object[]> scan(@NonNull String cursor, @NonNull ScanOptions options) {
String[] arguments = ArrayUtils.addFirst(options.toArgs(), cursor);
return commandManager.submitNewCommand(Scan, arguments, this::handleArrayResponse);
}
}
78 changes: 78 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
import glide.api.logging.Logger;
import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.commands.FlushMode;
import glide.api.models.commands.InfoOptions;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.ffi.resolvers.ClusterScanCursorResolver;
import glide.managers.CommandManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -908,6 +913,22 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor) {
return commandManager
.submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
.submitClusterScan(cursor, options, this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Expand Down Expand Up @@ -980,4 +1001,61 @@ public CompletableFuture<Long> sortStore(
new GlideString[] {key}, sortClusterOptions.toGlideStringArgs(), storeArguments);
return commandManager.submitNewCommand(Sort, arguments, this::handleLongResponse);
}

/** A {@link ClusterScanCursor} implementation for interacting with the Rust layer. */
private static final class NativeClusterScanCursor
implements CommandManager.ClusterScanCursorDetail {

private String cursorHandle;
private boolean isFinished;
private boolean isClosed = false;

// This is for internal use only.
public NativeClusterScanCursor(@NonNull String cursorHandle) {
this.cursorHandle = cursorHandle;
this.isFinished = ClusterScanCursorResolver.FINISHED_CURSOR_HANDLE.equals(cursorHandle);
}

@Override
public String getCursorHandle() {
return cursorHandle;
}

@Override
public boolean isFinished() {
return isFinished;
}

@Override
public void releaseCursorHandle() {
internalClose();
}

@Override
protected void finalize() throws Throwable {
try {
// Release the native cursor
this.internalClose();
} finally {
super.finalize();
}
}

private void internalClose() {
if (!isClosed) {
try {
ClusterScanCursorResolver.releaseNativeCursor(cursorHandle);
} catch (Exception ex) {
Logger.log(
Logger.Level.ERROR,
"ClusterScanCursor",
() -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage());
Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex);
} finally {
// Mark the cursor as closed to avoid double-free (if close() gets called more than once).
isClosed = true;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import glide.api.models.GlideString;
import glide.api.models.Transaction;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
Expand Down Expand Up @@ -152,6 +154,129 @@ public interface GenericClusterCommands {
*/
CompletableFuture<String> randomKey();

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options);

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
Loading

0 comments on commit 9f98e66

Please sign in to comment.