From 97a536bcd1c2652ca4319e2d669daf831a30ae79 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 15:39:30 -0700 Subject: [PATCH 1/9] Java: add scan (binary) for cluster commands Signed-off-by: Andrew Carbonetto --- .../java/glide/api/RedisClusterClient.java | 16 +++ .../api/commands/GenericClusterCommands.java | 124 +++++++++++++++++- .../glide/api/RedisClusterClientTest.java | 66 ++++++++++ .../glide/cluster/ClusterClientTests.java | 33 +++++ 4 files changed, 237 insertions(+), 2 deletions(-) diff --git a/java/client/src/main/java/glide/api/RedisClusterClient.java b/java/client/src/main/java/glide/api/RedisClusterClient.java index b848b3e5b2..50e007f977 100644 --- a/java/client/src/main/java/glide/api/RedisClusterClient.java +++ b/java/client/src/main/java/glide/api/RedisClusterClient.java @@ -1064,6 +1064,14 @@ public CompletableFuture scan(ClusterScanCursor cursor) { result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); } + @Override + public CompletableFuture scanBinary(ClusterScanCursor cursor) { + return commandManager + .submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponseBinary) + .thenApply( + result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); + } + @Override public CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options) { return commandManager @@ -1072,6 +1080,14 @@ public CompletableFuture scan(ClusterScanCursor cursor, ScanOptions op result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); } + @Override + public CompletableFuture scanBinary(ClusterScanCursor cursor, ScanOptions options) { + return commandManager + .submitClusterScan(cursor, options, this::handleArrayResponseBinary) + .thenApply( + result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); + } + @Override public CompletableFuture sort( @NonNull String key, @NonNull SortClusterOptions sortClusterOptions) { diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index ac09255318..d999b7676e 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -251,6 +251,66 @@ public interface GenericClusterCommands { */ CompletableFuture scan(ClusterScanCursor cursor); + /** + * Incrementally iterates over the keys in the Cluster. + * + *

This command is similar to the SCAN command, but it is designed to work in a + * Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor} + * object to manage iterations. For more information about the Cluster Scan implementation, see Cluster + * Scan. + * + *

As with the SCAN command, this command is a cursor-based iterator. This means + * that at every call of the command, the server returns an updated cursor ({@link + * ClusterScanCursor}) that the user needs to re-send as the cursor argument in the + * next call. The iteration terminates when the returned cursor {@link + * ClusterScanCursor#isFinished()} returns true. + * + *

This method guarantees that all keyslots available when the first SCAN is called will be + * scanned before the cursor is finished. Any keys added after the initial scan request is made + * are not guaranteed to be scanned. + * + *

Note that the same key may be returned in multiple scan iterations. + * + *

How to use the {@link ClusterScanCursor}:
+ * For each iteration, the previous scan {@link ClusterScanCursor} object should be used to + * continue the SCAN by passing it in the cursor argument. Using the + * same cursor object for multiple iterations may result in the same keys returned or unexpected + * behavior. + * + *

When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to + * immediately free resources tied to the cursor. Note that this makes the cursor unusable in + * subsequent calls to SCAN. + * + * @see ClusterScanCursor for more details about how to use the cursor. + * @see valkey.io for details. + * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new + * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. + * @return An Array with two elements. The first element is always the {@link + * ClusterScanCursor} for the next iteration of results. To see if there is more data on the + * given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the + * current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after + * using the cursor in a call to this method. The cursor cannot be used in a scan again after + * {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an + * Array of GlideString elements each representing a key. + * @example + *

{@code
+     * // Assume key contains a set with 200 keys
+     * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * Object[] result;
+     * while (!cursor.isFinished()) {
+     *   result = client.scan(cursor).get();
+     *   cursor.releaseCursorHandle();
+     *   cursor = (ClusterScanCursor) result[0];
+     *   Object[] glideStringResults = (Object[]) result[1];
+     *
+     *   System.out.println("\nSCAN iteration:");
+     *   Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", "));
+     * }
+     * }
+ */ + CompletableFuture scanBinary(ClusterScanCursor cursor); + /** * Incrementally iterates over the keys in the Cluster. * @@ -293,8 +353,7 @@ public interface GenericClusterCommands { * current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after * using the cursor in a call to this method. The cursor cannot be used in a scan again after * {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an - * - * Array of String elements each representing a key. + * Array of String elements each representing a key. * @example *
{@code
      * // Assume key contains a set with 200 keys
@@ -313,6 +372,67 @@ public interface GenericClusterCommands {
      */
     CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options);
 
+    /**
+     * Incrementally iterates over the keys in the Cluster.
+     *
+     * 

This command is similar to the SCAN command, but it is designed to work in a + * Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor} + * object to manage iterations. For more information about the Cluster Scan implementation, see Cluster + * Scan. + * + *

As with the SCAN command, this command is a cursor-based iterator. This means + * that at every call of the command, the server returns an updated cursor ({@link + * ClusterScanCursor}) that the user needs to re-send as the cursor argument in the + * next call. The iteration terminates when the returned cursor {@link + * ClusterScanCursor#isFinished()} returns true. + * + *

This method guarantees that all keyslots available when the first SCAN is called will be + * scanned before the cursor is finished. Any keys added after the initial scan request is made + * are not guaranteed to be scanned. + * + *

Note that the same key may be returned in multiple scan iterations. + * + *

How to use the {@link ClusterScanCursor}:
+ * For each iteration, the previous scan {@link ClusterScanCursor} object should be used to + * continue the SCAN by passing it in the cursor argument. Using the + * same cursor object for multiple iterations may result in the same keys returned or unexpected + * behavior. + * + *

When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to + * immediately free resources tied to the cursor. Note that this makes the cursor unusable in + * subsequent calls to SCAN. + * + * @see ClusterScanCursor for more details about how to use the cursor. + * @see valkey.io for details. + * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new + * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. + * @param options The {@link ScanOptions}. + * @return An Array with two elements. The first element is always the {@link + * ClusterScanCursor} for the next iteration of results. To see if there is more data on the + * given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the + * current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after + * using the cursor in a call to this method. The cursor cannot be used in a scan again after + * {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an + * Array of GlideString elements each representing a key. + * @example + *

{@code
+     * // Assume key contains a set with 200 keys
+     * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * Object[] result;
+     * while (!cursor.isFinished()) {
+     *   result = client.scan(cursor).get();
+     *   cursor.releaseCursorHandle();
+     *   cursor = (ClusterScanCursor) result[0];
+     *   Object[] glideStringResults = (Object[]) result[1];
+     *
+     *   System.out.println("\nSCAN iteration:");
+     *   Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", "));
+     * }
+     * }
+ */ + CompletableFuture scanBinary(ClusterScanCursor cursor, ScanOptions options); + /** * Sorts the elements in the list, set, or sorted set at key and returns the result. *
diff --git a/java/client/src/test/java/glide/api/RedisClusterClientTest.java b/java/client/src/test/java/glide/api/RedisClusterClientTest.java index ff89d91b33..228cfc63f4 100644 --- a/java/client/src/test/java/glide/api/RedisClusterClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClusterClientTest.java @@ -3074,6 +3074,32 @@ public void scan_existing_cursor() { ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); } + @SneakyThrows + @Test + public void scan_binary_existing_cursor() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + CommandManager.ClusterScanCursorDetail mockResultCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockResultCursor.getCursorHandle()).thenReturn("2"); + + final Object[] result = + new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(mockCursor), eq(ScanOptions.builder().build()), any())) + .thenReturn(testResponse); + + CompletableFuture actualResponse = service.scan(mockCursor); + Object[] payload = actualResponse.get(); + assertEquals( + mockResultCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle()); + assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]); + } + @SneakyThrows @Test public void scan_new_cursor_options() { @@ -3144,4 +3170,44 @@ public void scan_existing_cursor_options() { mockResultCursor.getCursorHandle(), ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); } + + @SneakyThrows + @Test + public void scan_binary_existing_cursor_options() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + CommandManager.ClusterScanCursorDetail mockResultCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockResultCursor.getCursorHandle()).thenReturn("2"); + + final Object[] result = + new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(mockCursor), + eq( + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()), + any())) + .thenReturn(testResponse); + + CompletableFuture actualResponse = + service.scan( + mockCursor, + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()); + Object[] payload = actualResponse.get(); + assertEquals( + mockResultCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle()); + assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]); + } } diff --git a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java index 063dbb3a8d..ab79b22701 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java @@ -205,6 +205,39 @@ public void test_cluster_scan_simple() { } } + @Test + @SneakyThrows + public void test_cluster_scan_binary_simple() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + assertEquals(OK, client.flushall().get()); + + String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + for (int i = 0; i < 100; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, client.mset(expectedData).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = client.scanBinary(cursor).get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + + assertEquals(expectedData.keySet(), result); + } + } + @Test @SneakyThrows public void test_cluster_scan_with_object_type_and_pattern() { From ea49a708ea4718f4e43e1429227ec7ffc1b39959 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 16:04:42 -0700 Subject: [PATCH 2/9] SPOTELSS Signed-off-by: Andrew Carbonetto --- .../src/test/java/glide/cluster/ClusterClientTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java index ab79b22701..d0783e9301 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java @@ -209,7 +209,7 @@ public void test_cluster_scan_simple() { @SneakyThrows public void test_cluster_scan_binary_simple() { try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { assertEquals(OK, client.flushall().get()); String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); From d85a6c9e7fb74a7d549bdcb2e95441cdbfd9daee Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 16:43:33 -0700 Subject: [PATCH 3/9] fix documentation Signed-off-by: Andrew Carbonetto --- .../main/java/glide/api/commands/GenericClusterCommands.java | 2 -- .../src/test/java/glide/cluster/ClusterClientTests.java | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index d999b7676e..2170a8d70e 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -221,7 +221,6 @@ public interface GenericClusterCommands { * immediately free resources tied to the cursor. Note that this makes the cursor unusable in * subsequent calls to SCAN. * - * @see ClusterScanCursor for more details about how to use the cursor. * @see valkey.io for details. * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. @@ -282,7 +281,6 @@ public interface GenericClusterCommands { * immediately free resources tied to the cursor. Note that this makes the cursor unusable in * subsequent calls to SCAN. * - * @see ClusterScanCursor for more details about how to use the cursor. * @see valkey.io for details. * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. diff --git a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java index d0783e9301..56973548d6 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java @@ -414,6 +414,7 @@ public void test_cluster_scan_cleaning_cursor() { } } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_all_types() { From 31e1be28b20141b02d49900cf09be82535165531 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 17:03:40 -0700 Subject: [PATCH 4/9] Moving scan IT tests Signed-off-by: Andrew Carbonetto --- .../glide/cluster/ClusterClientTests.java | 420 ------------------ .../test/java/glide/cluster/CommandTests.java | 380 ++++++++++++++++ 2 files changed, 380 insertions(+), 420 deletions(-) diff --git a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java index 56973548d6..65a4f40572 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java @@ -6,28 +6,15 @@ import static glide.TestUtilities.getRandomString; import static glide.api.BaseClient.OK; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; import glide.api.RedisClusterClient; -import glide.api.models.commands.scan.ClusterScanCursor; -import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RedisCredentials; import glide.api.models.exceptions.ClosingException; import glide.api.models.exceptions.RequestException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -171,411 +158,4 @@ public void closed_client_throws_ExecutionException_with_ClosingException_as_cau assertThrows(ExecutionException.class, () -> client.set("foo", "bar").get()); assertTrue(executionException.getCause() instanceof ClosingException); } - - @Test - @SneakyThrows - public void test_cluster_scan_simple() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - assertEquals(OK, client.flushall().get()); - - String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - for (int i = 0; i < 100; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - - assertEquals(OK, client.mset(expectedData).get()); - - Set result = new LinkedHashSet<>(); - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - while (!cursor.isFinished()) { - final Object[] response = client.scan(cursor).get(); - cursor.releaseCursorHandle(); - - cursor = (ClusterScanCursor) response[0]; - final Object[] data = (Object[]) response[1]; - for (Object datum : data) { - result.add(datum.toString()); - } - } - cursor.releaseCursorHandle(); - - assertEquals(expectedData.keySet(), result); - } - } - - @Test - @SneakyThrows - public void test_cluster_scan_binary_simple() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - assertEquals(OK, client.flushall().get()); - - String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - for (int i = 0; i < 100; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - - assertEquals(OK, client.mset(expectedData).get()); - - Set result = new LinkedHashSet<>(); - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - while (!cursor.isFinished()) { - final Object[] response = client.scanBinary(cursor).get(); - cursor.releaseCursorHandle(); - - cursor = (ClusterScanCursor) response[0]; - final Object[] data = (Object[]) response[1]; - for (Object datum : data) { - result.add(datum.toString()); - } - } - cursor.releaseCursorHandle(); - - assertEquals(expectedData.keySet(), result); - } - } - - @Test - @SneakyThrows - public void test_cluster_scan_with_object_type_and_pattern() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - - assertEquals(OK, client.flushall().get()); - String key = "key:" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 100; - for (int i = 0; i < baseNumberOfEntries; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - - assertEquals(OK, client.mset(expectedData).get()); - - ArrayList unexpectedTypeKeys = new ArrayList<>(); - for (int i = baseNumberOfEntries; i < baseNumberOfEntries + 100; i++) { - unexpectedTypeKeys.add(key + ":" + i); - } - - for (String keyStr : unexpectedTypeKeys) { - assertEquals(1L, client.sadd(keyStr, new String[] {"value"}).get()); - } - - Map unexpectedPatterns = new LinkedHashMap<>(); - for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { - unexpectedPatterns.put("foo:" + i, "value " + i); - } - assertEquals(OK, client.mset(unexpectedPatterns).get()); - - Set result = new LinkedHashSet<>(); - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - while (!cursor.isFinished()) { - final Object[] response = - client - .scan( - cursor, - ScanOptions.builder() - .matchPattern("key:*") - .type(ScanOptions.ObjectType.STRING) - .build()) - .get(); - cursor.releaseCursorHandle(); - - cursor = (ClusterScanCursor) response[0]; - final Object[] data = (Object[]) response[1]; - for (Object datum : data) { - result.add(datum.toString()); - } - } - cursor.releaseCursorHandle(); - assertEquals(expectedData.keySet(), result); - - // Ensure that no unexpected types were in the result. - assertFalse(new LinkedHashSet<>(result).removeAll(new LinkedHashSet<>(unexpectedTypeKeys))); - assertFalse(new LinkedHashSet<>(result).removeAll(unexpectedPatterns.keySet())); - } - } - - @Test - @SneakyThrows - public void test_cluster_scan_with_count() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - - assertEquals(OK, client.flushall().get()); - String key = "key:" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 2000; - for (int i = 0; i < baseNumberOfEntries; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - - assertEquals(OK, client.mset(expectedData).get()); - - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - Set keys = new LinkedHashSet<>(); - int successfulComparedScans = 0; - while (!cursor.isFinished()) { - Object[] resultOf1 = client.scan(cursor, ScanOptions.builder().count(1L).build()).get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) resultOf1[0]; - keys.addAll( - Arrays.stream((Object[]) resultOf1[1]) - .map(Object::toString) - .collect(Collectors.toList())); - if (cursor.isFinished()) { - break; - } - - Object[] resultOf100 = client.scan(cursor, ScanOptions.builder().count(100L).build()).get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) resultOf100[0]; - keys.addAll( - Arrays.stream((Object[]) resultOf100[1]) - .map(Object::toString) - .collect(Collectors.toList())); - - // Note: count is only an optimization hint. It does not have to return the size specified. - if (resultOf1.length <= resultOf100.length) { - successfulComparedScans++; - } - } - cursor.releaseCursorHandle(); - assertTrue(successfulComparedScans > 0); - assertEquals(expectedData.keySet(), keys); - } - } - - @Test - @SneakyThrows - public void test_cluster_scan_with_match() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - - assertEquals(OK, client.flushall().get()); - String key = "key:" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 2000; - for (int i = 0; i < baseNumberOfEntries; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - assertEquals(OK, client.mset(expectedData).get()); - - Map unexpectedPatterns = new LinkedHashMap<>(); - for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { - unexpectedPatterns.put("foo:" + i, "value " + i); - } - assertEquals(OK, client.mset(unexpectedPatterns).get()); - - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - Set keys = new LinkedHashSet<>(); - while (!cursor.isFinished()) { - Object[] result = - client.scan(cursor, ScanOptions.builder().matchPattern("key:*").build()).get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) result[0]; - keys.addAll( - Arrays.stream((Object[]) result[1]).map(Object::toString).collect(Collectors.toList())); - } - cursor.releaseCursorHandle(); - assertEquals(expectedData.keySet(), keys); - assertFalse(new LinkedHashSet<>(keys).removeAll(unexpectedPatterns.keySet())); - } - } - - @Test - @SneakyThrows - public void test_cluster_scan_cleaning_cursor() { - // We test whether the cursor is cleaned up after it is deleted, which we expect to happen when - // th GC is called. - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - assertEquals(OK, client.flushall().get()); - - String key = "key:" + UUID.randomUUID(); - Map expectedData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 100; - for (int i = 0; i < baseNumberOfEntries; i++) { - expectedData.put(key + ":" + i, "value " + i); - } - assertEquals(OK, client.mset(expectedData).get()); - - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - final Object[] response = client.scan(cursor).get(); - cursor = (ClusterScanCursor) (response[0]); - cursor.releaseCursorHandle(); - final ClusterScanCursor brokenCursor = cursor; - ExecutionException exception = - assertThrows(ExecutionException.class, () -> client.scan(brokenCursor).get()); - assertInstanceOf(RequestException.class, exception.getCause()); - assertTrue(exception.getCause().getMessage().contains("Invalid scan_state_cursor id")); - } - } - - @Timeout(20) - @Test - @SneakyThrows - public void test_cluster_scan_all_types() { - try (RedisClusterClient client = - RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { - assertEquals(OK, client.flushall().get()); - - String key = "key:" + UUID.randomUUID(); - Map stringData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 100; - for (int i = 0; i < baseNumberOfEntries; i++) { - stringData.put(key + ":" + i, "value " + i); - } - assertEquals(OK, client.mset(stringData).get()); - - String setKey = "setKey:" + UUID.randomUUID(); - Map setData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - setData.put(setKey + ":" + i, "value " + i); - } - for (String k : setData.keySet()) { - assertEquals(1L, client.sadd(k, new String[] {"value" + k}).get()); - } - - String hashKey = "hashKey:" + UUID.randomUUID(); - Map hashData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - hashData.put(hashKey + ":" + i, "value " + i); - } - for (String k : hashData.keySet()) { - assertEquals(1L, client.hset(k, Map.of("field" + k, "value" + k)).get()); - } - - String listKey = "listKey:" + UUID.randomUUID(); - Map listData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - listData.put(listKey + ":" + i, "value " + i); - } - for (String k : listData.keySet()) { - assertEquals(1L, client.lpush(k, new String[] {"value" + k}).get()); - } - - String zSetKey = "zSetKey:" + UUID.randomUUID(); - Map zSetData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - zSetData.put(zSetKey + ":" + i, "value " + i); - } - for (String k : zSetData.keySet()) { - assertEquals(1L, client.zadd(k, Map.of(k, 1.0)).get()); - } - - String streamKey = "streamKey:" + UUID.randomUUID(); - Map streamData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - streamData.put(streamKey + ":" + i, "value " + i); - } - for (String k : streamData.keySet()) { - assertNotNull(client.xadd(k, Map.of(k, "value " + k)).get()); - } - - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); - Set results = new LinkedHashSet<>(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STRING).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(stringData.keySet(), results); - - cursor = ClusterScanCursor.initalCursor(); - results.clear(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.SET).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(setData.keySet(), results); - - cursor = ClusterScanCursor.initalCursor(); - results.clear(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.HASH).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(hashData.keySet(), results); - - cursor = ClusterScanCursor.initalCursor(); - results.clear(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.LIST).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(listData.keySet(), results); - - cursor = ClusterScanCursor.initalCursor(); - results.clear(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.ZSET).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(zSetData.keySet(), results); - - cursor = ClusterScanCursor.initalCursor(); - results.clear(); - while (!cursor.isFinished()) { - Object[] response = - client - .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STREAM).build()) - .get(); - cursor.releaseCursorHandle(); - cursor = (ClusterScanCursor) response[0]; - results.addAll( - Arrays.stream((Object[]) response[1]) - .map(Object::toString) - .collect(Collectors.toSet())); - } - cursor.releaseCursorHandle(); - assertEquals(streamData.keySet(), results); - } - } } diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 314ace01d9..13a222c8b9 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -42,6 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -65,6 +66,8 @@ import glide.api.models.commands.geospatial.GeoSearchShape; import glide.api.models.commands.geospatial.GeoSearchStoreOptions; import glide.api.models.commands.geospatial.GeoUnit; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RequestRoutingConfiguration.ByAddressRoute; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; @@ -73,8 +76,11 @@ import glide.api.models.exceptions.RequestException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -82,6 +88,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; import org.junit.jupiter.api.AfterAll; @@ -2552,4 +2559,377 @@ public void sort_binary() { assertArrayEquals( key2DescendingListSubset_strings, clusterClient.lrange(key3.toString(), 0, -1).get()); } + + @Test + @SneakyThrows + public void test_cluster_scan_simple() { + assertEquals(OK, clusterClient.flushall().get()); + + String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + for (int i = 0; i < 100; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, clusterClient.mset(expectedData).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = clusterClient.scan(cursor).get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + + assertEquals(expectedData.keySet(), result); + } + + @Test + @SneakyThrows + public void test_cluster_scan_binary_simple() { + assertEquals(OK, clusterClient.flushall().get()); + + String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + for (int i = 0; i < 100; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, clusterClient.mset(expectedData).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = clusterClient.scanBinary(cursor).get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + + assertEquals(expectedData.keySet(), result); + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_object_type_and_pattern() { + assertEquals(OK, clusterClient.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, clusterClient.mset(expectedData).get()); + + ArrayList unexpectedTypeKeys = new ArrayList<>(); + for (int i = baseNumberOfEntries; i < baseNumberOfEntries + 100; i++) { + unexpectedTypeKeys.add(key + ":" + i); + } + + for (String keyStr : unexpectedTypeKeys) { + assertEquals(1L, clusterClient.sadd(keyStr, new String[] {"value"}).get()); + } + + Map unexpectedPatterns = new LinkedHashMap<>(); + for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { + unexpectedPatterns.put("foo:" + i, "value " + i); + } + assertEquals(OK, clusterClient.mset(unexpectedPatterns).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = + clusterClient + .scan( + cursor, + ScanOptions.builder() + .matchPattern("key:*") + .type(ScanOptions.ObjectType.STRING) + .build()) + .get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + assertEquals(expectedData.keySet(), result); + + // Ensure that no unexpected types were in the result. + assertFalse(new LinkedHashSet<>(result).removeAll(new LinkedHashSet<>(unexpectedTypeKeys))); + assertFalse(new LinkedHashSet<>(result).removeAll(unexpectedPatterns.keySet())); + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_count() { + assertEquals(OK, clusterClient.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 2000; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, clusterClient.mset(expectedData).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set keys = new LinkedHashSet<>(); + int successfulComparedScans = 0; + while (!cursor.isFinished()) { + Object[] resultOf1 = + clusterClient.scan(cursor, ScanOptions.builder().count(1L).build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) resultOf1[0]; + keys.addAll( + Arrays.stream((Object[]) resultOf1[1]) + .map(Object::toString) + .collect(Collectors.toList())); + if (cursor.isFinished()) { + break; + } + + Object[] resultOf100 = + clusterClient.scan(cursor, ScanOptions.builder().count(100L).build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) resultOf100[0]; + keys.addAll( + Arrays.stream((Object[]) resultOf100[1]) + .map(Object::toString) + .collect(Collectors.toList())); + + // Note: count is only an optimization hint. It does not have to return the size specified. + if (resultOf1.length <= resultOf100.length) { + successfulComparedScans++; + } + } + cursor.releaseCursorHandle(); + assertTrue(successfulComparedScans > 0); + assertEquals(expectedData.keySet(), keys); + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_match() { + assertEquals(OK, clusterClient.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 2000; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, clusterClient.mset(expectedData).get()); + + Map unexpectedPatterns = new LinkedHashMap<>(); + for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { + unexpectedPatterns.put("foo:" + i, "value " + i); + } + assertEquals(OK, clusterClient.mset(unexpectedPatterns).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set keys = new LinkedHashSet<>(); + while (!cursor.isFinished()) { + Object[] result = + clusterClient.scan(cursor, ScanOptions.builder().matchPattern("key:*").build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) result[0]; + keys.addAll( + Arrays.stream((Object[]) result[1]).map(Object::toString).collect(Collectors.toList())); + } + cursor.releaseCursorHandle(); + assertEquals(expectedData.keySet(), keys); + assertFalse(new LinkedHashSet<>(keys).removeAll(unexpectedPatterns.keySet())); + } + + @Test + @SneakyThrows + public void test_cluster_scan_cleaning_cursor() { + // We test whether the cursor is cleaned up after it is deleted, which we expect to happen when + // th GC is called. + assertEquals(OK, clusterClient.flushall().get()); + + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, clusterClient.mset(expectedData).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + final Object[] response = clusterClient.scan(cursor).get(); + cursor = (ClusterScanCursor) (response[0]); + cursor.releaseCursorHandle(); + final ClusterScanCursor brokenCursor = cursor; + ExecutionException exception = + assertThrows(ExecutionException.class, () -> clusterClient.scan(brokenCursor).get()); + assertInstanceOf(RequestException.class, exception.getCause()); + assertTrue(exception.getCause().getMessage().contains("Invalid scan_state_cursor id")); + } + + @Timeout(20) + @Test + @SneakyThrows + public void test_cluster_scan_all_types() { + assertEquals(OK, clusterClient.flushall().get()); + + String key = "key:" + UUID.randomUUID(); + Map stringData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + stringData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, clusterClient.mset(stringData).get()); + + String setKey = "setKey:" + UUID.randomUUID(); + Map setData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + setData.put(setKey + ":" + i, "value " + i); + } + for (String k : setData.keySet()) { + assertEquals(1L, clusterClient.sadd(k, new String[] {"value" + k}).get()); + } + + String hashKey = "hashKey:" + UUID.randomUUID(); + Map hashData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + hashData.put(hashKey + ":" + i, "value " + i); + } + for (String k : hashData.keySet()) { + assertEquals(1L, clusterClient.hset(k, Map.of("field" + k, "value" + k)).get()); + } + + String listKey = "listKey:" + UUID.randomUUID(); + Map listData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + listData.put(listKey + ":" + i, "value " + i); + } + for (String k : listData.keySet()) { + assertEquals(1L, clusterClient.lpush(k, new String[] {"value" + k}).get()); + } + + String zSetKey = "zSetKey:" + UUID.randomUUID(); + Map zSetData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + zSetData.put(zSetKey + ":" + i, "value " + i); + } + for (String k : zSetData.keySet()) { + assertEquals(1L, clusterClient.zadd(k, Map.of(k, 1.0)).get()); + } + + String streamKey = "streamKey:" + UUID.randomUUID(); + Map streamData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + streamData.put(streamKey + ":" + i, "value " + i); + } + for (String k : streamData.keySet()) { + assertNotNull(clusterClient.xadd(k, Map.of(k, "value " + k)).get()); + } + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STRING).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(stringData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.SET).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(setData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.HASH).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(hashData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.LIST).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(listData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.ZSET).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(zSetData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + clusterClient + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STREAM).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]).map(Object::toString).collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(streamData.keySet(), results); + } } From a52ed9fa7da2aec1b28a44b4339fb7dde9b44a92 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 17:10:37 -0700 Subject: [PATCH 5/9] Doc fix Signed-off-by: Andrew Carbonetto --- java/integTest/src/test/java/glide/cluster/CommandTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 13a222c8b9..5caaf5bb31 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -2761,7 +2761,7 @@ public void test_cluster_scan_with_match() { @SneakyThrows public void test_cluster_scan_cleaning_cursor() { // We test whether the cursor is cleaned up after it is deleted, which we expect to happen when - // th GC is called. + // the GC is called. assertEquals(OK, clusterClient.flushall().get()); String key = "key:" + UUID.randomUUID(); From d1ae1fc7e019da1e0b8ae23d275f071186e82342 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 17:28:14 -0700 Subject: [PATCH 6/9] Increase timeout on scan tests Signed-off-by: Andrew Carbonetto --- .../api/commands/GenericClusterCommands.java | 38 +++++++++---------- .../test/java/glide/cluster/CommandTests.java | 6 +++ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index 2170a8d70e..fcba7ae42d 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -242,11 +242,10 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] stringResults = (Object[]) result[1]; - * - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * }
+ * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); + * */ CompletableFuture scan(ClusterScanCursor cursor); @@ -301,11 +300,10 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] glideStringResults = (Object[]) result[1]; - * - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * } + * System.out.println("\nSCAN iteration:"); + * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); + * */ CompletableFuture scanBinary(ClusterScanCursor cursor); @@ -340,7 +338,6 @@ public interface GenericClusterCommands { * immediately free resources tied to the cursor. Note that this makes the cursor unusable in * subsequent calls to SCAN. * - * @see ClusterScanCursor for more details about how to use the cursor. * @see valkey.io for details. * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. @@ -356,17 +353,18 @@ public interface GenericClusterCommands { *
{@code
      * // Assume key contains a set with 200 keys
      * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * // Scan for keys with archived in the name
+     * ScanOptions options = ScanOptions.builder().matchPattern("*archived*").build();
      * Object[] result;
      * while (!cursor.isFinished()) {
-     *   result = client.scan(cursor).get();
+     *   result = client.scan(cursor, options).get();
      *   cursor.releaseCursorHandle();
      *   cursor = (ClusterScanCursor) result[0];
      *   Object[] stringResults = (Object[]) result[1];
-     *
-     *   System.out.println("\nSCAN iteration:");
-     *   Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
      * }
-     * }
+ * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); + * */ CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options); @@ -401,7 +399,6 @@ public interface GenericClusterCommands { * immediately free resources tied to the cursor. Note that this makes the cursor unusable in * subsequent calls to SCAN. * - * @see ClusterScanCursor for more details about how to use the cursor. * @see valkey.io for details. * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. @@ -417,17 +414,18 @@ public interface GenericClusterCommands { *
{@code
      * // Assume key contains a set with 200 keys
      * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * // Scan for keys with archived in the name
+     * ScanOptions options = ScanOptions.builder().matchPattern("*archived*").build();
      * Object[] result;
      * while (!cursor.isFinished()) {
-     *   result = client.scan(cursor).get();
+     *   result = client.scan(cursor, options).get();
      *   cursor.releaseCursorHandle();
      *   cursor = (ClusterScanCursor) result[0];
      *   Object[] glideStringResults = (Object[]) result[1];
-     *
-     *   System.out.println("\nSCAN iteration:");
-     *   Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", "));
      * }
-     * }
+ * System.out.println("\nSCAN iteration:"); + * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); + * */ CompletableFuture scanBinary(ClusterScanCursor cursor, ScanOptions options); diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 0924f59701..f3efe32d63 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -2559,6 +2559,7 @@ public void sort_binary() { key2DescendingListSubset_strings, clusterClient.lrange(key3.toString(), 0, -1).get()); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_simple() { @@ -2589,6 +2590,7 @@ public void test_cluster_scan_simple() { assertEquals(expectedData.keySet(), result); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_binary_simple() { @@ -2619,6 +2621,7 @@ public void test_cluster_scan_binary_simple() { assertEquals(expectedData.keySet(), result); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_with_object_type_and_pattern() { @@ -2675,6 +2678,7 @@ public void test_cluster_scan_with_object_type_and_pattern() { assertFalse(new LinkedHashSet<>(result).removeAll(unexpectedPatterns.keySet())); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_with_count() { @@ -2723,6 +2727,7 @@ public void test_cluster_scan_with_count() { assertEquals(expectedData.keySet(), keys); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_with_match() { @@ -2756,6 +2761,7 @@ public void test_cluster_scan_with_match() { assertFalse(new LinkedHashSet<>(keys).removeAll(unexpectedPatterns.keySet())); } + @Timeout(20) @Test @SneakyThrows public void test_cluster_scan_cleaning_cursor() { From f7ae2e8a50f6d75236025b4f2b223a4d09f46dc8 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 17:49:25 -0700 Subject: [PATCH 7/9] doc fix Signed-off-by: Andrew Carbonetto --- .../java/glide/api/commands/GenericClusterCommands.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index fcba7ae42d..5c64ae142d 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -245,7 +245,7 @@ public interface GenericClusterCommands { * } * System.out.println("\nSCAN iteration:"); * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); - * + * } */ CompletableFuture scan(ClusterScanCursor cursor); @@ -303,7 +303,7 @@ public interface GenericClusterCommands { * } * System.out.println("\nSCAN iteration:"); * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); - * + * } */ CompletableFuture scanBinary(ClusterScanCursor cursor); @@ -364,7 +364,7 @@ public interface GenericClusterCommands { * } * System.out.println("\nSCAN iteration:"); * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); - * + * } */ CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options); @@ -425,7 +425,7 @@ public interface GenericClusterCommands { * } * System.out.println("\nSCAN iteration:"); * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); - * + * } */ CompletableFuture scanBinary(ClusterScanCursor cursor, ScanOptions options); From 8739a370fe1b28a83123dbf139141cc5a733c527 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 17:53:52 -0700 Subject: [PATCH 8/9] doc fix Signed-off-by: Andrew Carbonetto --- .../api/commands/GenericClusterCommands.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index 5c64ae142d..be67d53ba5 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -242,9 +242,9 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] stringResults = (Object[]) result[1]; + * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } */ CompletableFuture scan(ClusterScanCursor cursor); @@ -300,9 +300,9 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] glideStringResults = (Object[]) result[1]; + * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); * } */ CompletableFuture scanBinary(ClusterScanCursor cursor); @@ -361,9 +361,9 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] stringResults = (Object[]) result[1]; + * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } */ CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options); @@ -422,9 +422,9 @@ public interface GenericClusterCommands { * cursor.releaseCursorHandle(); * cursor = (ClusterScanCursor) result[0]; * Object[] glideStringResults = (Object[]) result[1]; + * System.out.println("\nSCAN iteration:"); + * Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", ")); * } - * System.out.println("\nSCAN iteration:"); - * Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", ")); * } */ CompletableFuture scanBinary(ClusterScanCursor cursor, ScanOptions options); From 8534e8811565d06322eca43e9f2f7054b815c1f3 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Fri, 5 Jul 2024 18:26:50 -0700 Subject: [PATCH 9/9] Split IT tests to avoid timeout Signed-off-by: Andrew Carbonetto --- .../test/java/glide/cluster/CommandTests.java | 152 +++++++++++------- 1 file changed, 94 insertions(+), 58 deletions(-) diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index f3efe32d63..f43027f62b 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -2788,65 +2788,19 @@ public void test_cluster_scan_cleaning_cursor() { assertTrue(exception.getCause().getMessage().contains("Invalid scan_state_cursor id")); } - @Timeout(20) @Test @SneakyThrows - public void test_cluster_scan_all_types() { + public void test_cluster_scan_all_strings() { assertEquals(OK, clusterClient.flushall().get()); String key = "key:" + UUID.randomUUID(); Map stringData = new LinkedHashMap<>(); - final int baseNumberOfEntries = 100; + final int baseNumberOfEntries = 5; for (int i = 0; i < baseNumberOfEntries; i++) { stringData.put(key + ":" + i, "value " + i); } assertEquals(OK, clusterClient.mset(stringData).get()); - String setKey = "setKey:" + UUID.randomUUID(); - Map setData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - setData.put(setKey + ":" + i, "value " + i); - } - for (String k : setData.keySet()) { - assertEquals(1L, clusterClient.sadd(k, new String[] {"value" + k}).get()); - } - - String hashKey = "hashKey:" + UUID.randomUUID(); - Map hashData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - hashData.put(hashKey + ":" + i, "value " + i); - } - for (String k : hashData.keySet()) { - assertEquals(1L, clusterClient.hset(k, Map.of("field" + k, "value" + k)).get()); - } - - String listKey = "listKey:" + UUID.randomUUID(); - Map listData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - listData.put(listKey + ":" + i, "value " + i); - } - for (String k : listData.keySet()) { - assertEquals(1L, clusterClient.lpush(k, new String[] {"value" + k}).get()); - } - - String zSetKey = "zSetKey:" + UUID.randomUUID(); - Map zSetData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - zSetData.put(zSetKey + ":" + i, "value " + i); - } - for (String k : zSetData.keySet()) { - assertEquals(1L, clusterClient.zadd(k, Map.of(k, 1.0)).get()); - } - - String streamKey = "streamKey:" + UUID.randomUUID(); - Map streamData = new LinkedHashMap<>(); - for (int i = 0; i < baseNumberOfEntries; i++) { - streamData.put(streamKey + ":" + i, "value " + i); - } - for (String k : streamData.keySet()) { - assertNotNull(clusterClient.xadd(k, Map.of(k, "value " + k)).get()); - } - ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); Set results = new LinkedHashSet<>(); while (!cursor.isFinished()) { @@ -2861,9 +2815,25 @@ public void test_cluster_scan_all_types() { } cursor.releaseCursorHandle(); assertEquals(stringData.keySet(), results); + } + + @Test + @SneakyThrows + public void test_cluster_scan_all_set() { + assertEquals(OK, clusterClient.flushall().get()); + final int baseNumberOfEntries = 5; + + String setKey = "setKey:" + UUID.randomUUID(); + Map setData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + setData.put(setKey + ":" + i, "value " + i); + } + for (String k : setData.keySet()) { + assertEquals(1L, clusterClient.sadd(k, new String[] {"value" + k}).get()); + } - cursor = ClusterScanCursor.initalCursor(); - results.clear(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); while (!cursor.isFinished()) { Object[] response = clusterClient @@ -2876,9 +2846,25 @@ public void test_cluster_scan_all_types() { } cursor.releaseCursorHandle(); assertEquals(setData.keySet(), results); + } - cursor = ClusterScanCursor.initalCursor(); - results.clear(); + @Test + @SneakyThrows + public void test_cluster_scan_all_hash() { + assertEquals(OK, clusterClient.flushall().get()); + final int baseNumberOfEntries = 5; + + String hashKey = "hashKey:" + UUID.randomUUID(); + Map hashData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + hashData.put(hashKey + ":" + i, "value " + i); + } + for (String k : hashData.keySet()) { + assertEquals(1L, clusterClient.hset(k, Map.of("field" + k, "value" + k)).get()); + } + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); while (!cursor.isFinished()) { Object[] response = clusterClient @@ -2891,9 +2877,25 @@ public void test_cluster_scan_all_types() { } cursor.releaseCursorHandle(); assertEquals(hashData.keySet(), results); + } + + @Test + @SneakyThrows + public void test_cluster_scan_all_list() { + assertEquals(OK, clusterClient.flushall().get()); + final int baseNumberOfEntries = 5; + + String listKey = "listKey:" + UUID.randomUUID(); + Map listData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + listData.put(listKey + ":" + i, "value " + i); + } + for (String k : listData.keySet()) { + assertEquals(1L, clusterClient.lpush(k, new String[] {"value" + k}).get()); + } - cursor = ClusterScanCursor.initalCursor(); - results.clear(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); while (!cursor.isFinished()) { Object[] response = clusterClient @@ -2906,9 +2908,26 @@ public void test_cluster_scan_all_types() { } cursor.releaseCursorHandle(); assertEquals(listData.keySet(), results); + } + + @Test + @SneakyThrows + public void test_cluster_scan_all_sorted_set() { + assertEquals(OK, clusterClient.flushall().get()); + final int baseNumberOfEntries = 5; + + String zSetKey = "zSetKey:" + UUID.randomUUID(); + Map zSetData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + zSetData.put(zSetKey + ":" + i, "value " + i); + } + for (String k : zSetData.keySet()) { + assertEquals(1L, clusterClient.zadd(k, Map.of(k, 1.0)).get()); + } + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); - cursor = ClusterScanCursor.initalCursor(); - results.clear(); while (!cursor.isFinished()) { Object[] response = clusterClient @@ -2921,9 +2940,26 @@ public void test_cluster_scan_all_types() { } cursor.releaseCursorHandle(); assertEquals(zSetData.keySet(), results); + } + + @Test + @SneakyThrows + public void test_cluster_scan_all_stream() { + assertEquals(OK, clusterClient.flushall().get()); + final int baseNumberOfEntries = 5; + + String streamKey = "streamKey:" + UUID.randomUUID(); + Map streamData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + streamData.put(streamKey + ":" + i, "value " + i); + } + for (String k : streamData.keySet()) { + assertNotNull(clusterClient.xadd(k, Map.of(k, "value " + k)).get()); + } + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); - cursor = ClusterScanCursor.initalCursor(); - results.clear(); while (!cursor.isFinished()) { Object[] response = clusterClient