Skip to content

Commit

Permalink
Java: add scan (binary) for cluster commands (valkey-io#1837)
Browse files Browse the repository at this point in the history
* Java: add scan (binary) for cluster commands

---------

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
acarbonetto authored Jul 6, 2024
1 parent dfbe66c commit 95172c7
Show file tree
Hide file tree
Showing 5 changed files with 627 additions and 393 deletions.
16 changes: 16 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,14 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor) {
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor) {
return commandManager
.submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponseBinary)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
Expand All @@ -1072,6 +1080,14 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions op
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
.submitClusterScan(cursor, options, this::handleArrayResponseBinary)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public interface GenericClusterCommands {
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
Expand All @@ -243,7 +242,6 @@ public interface GenericClusterCommands {
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
Expand Down Expand Up @@ -282,19 +280,16 @@ public interface GenericClusterCommands {
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* <code>Array</code> of <code>GlideString</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
Expand All @@ -304,15 +299,136 @@ public interface GenericClusterCommands {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
* Object[] glideStringResults = (Object[]) result[1];
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* // Scan for keys with archived in the name
* ScanOptions options = ScanOptions.builder().matchPattern("*archived*").build();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor, options).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options);

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>Array</code> of <code>GlideString</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* // Scan for keys with archived in the name
* ScanOptions options = ScanOptions.builder().matchPattern("*archived*").build();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor, options).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] glideStringResults = (Object[]) result[1];
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor, ScanOptions options);

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
66 changes: 66 additions & 0 deletions java/client/src/test/java/glide/api/RedisClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,32 @@ public void scan_existing_cursor() {
((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle());
}

@SneakyThrows
@Test
public void scan_binary_existing_cursor() {
CommandManager.ClusterScanCursorDetail mockCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockCursor.getCursorHandle()).thenReturn("1");

CommandManager.ClusterScanCursorDetail mockResultCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockResultCursor.getCursorHandle()).thenReturn("2");

final Object[] result =
new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}};
final CompletableFuture<Object[]> testResponse = CompletableFuture.completedFuture(result);
when(commandManager.<Object[]>submitClusterScan(
eq(mockCursor), eq(ScanOptions.builder().build()), any()))
.thenReturn(testResponse);

CompletableFuture<Object[]> actualResponse = service.scan(mockCursor);
Object[] payload = actualResponse.get();
assertEquals(
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle());
assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]);
}

@SneakyThrows
@Test
public void scan_new_cursor_options() {
Expand Down Expand Up @@ -3144,4 +3170,44 @@ public void scan_existing_cursor_options() {
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle());
}

@SneakyThrows
@Test
public void scan_binary_existing_cursor_options() {
CommandManager.ClusterScanCursorDetail mockCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockCursor.getCursorHandle()).thenReturn("1");

CommandManager.ClusterScanCursorDetail mockResultCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockResultCursor.getCursorHandle()).thenReturn("2");

final Object[] result =
new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}};
final CompletableFuture<Object[]> testResponse = CompletableFuture.completedFuture(result);
when(commandManager.<Object[]>submitClusterScan(
eq(mockCursor),
eq(
ScanOptions.builder()
.matchPattern("key:*")
.count(10L)
.type(ScanOptions.ObjectType.STRING)
.build()),
any()))
.thenReturn(testResponse);

CompletableFuture<Object[]> actualResponse =
service.scan(
mockCursor,
ScanOptions.builder()
.matchPattern("key:*")
.count(10L)
.type(ScanOptions.ObjectType.STRING)
.build());
Object[] payload = actualResponse.get();
assertEquals(
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle());
assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]);
}
}
Loading

0 comments on commit 95172c7

Please sign in to comment.