Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: add scan (binary) for cluster commands #1837

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,14 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor) {
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor) {
return commandManager
.submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponseBinary)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
Expand All @@ -1072,6 +1080,14 @@ public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions op
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor, ScanOptions options) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I afraid you need new class ScanOptionsBinary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Binary options are useless and don't provide any additional benefit. They are phasing them out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is String matchPattern unfortunately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protobuf doesn't accept anything but strings, so we cannot sent binary scan options

return commandManager
.submitClusterScan(cursor, options, this::handleArrayResponseBinary)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, but I'd be curious if the Rust layer can ever return non-UTF-8 binary data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the cursor...?

Copy link
Contributor Author

@acarbonetto acarbonetto Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I was thinking that too.
I've asked Avi. I'm pretty sure we're good (otherwise our String version would fail).

}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,66 @@ public interface GenericClusterCommands {
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc is TLDR. I think you just copied it, but it is good to say what is the difference in the functions. Why/When a user should use this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't change the documentation between String and GlideString. I think they wanted to consolidate the two APIs at some point...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe...
But these 2 funcs have 1:1 docs and signatures - really confusing.
On another hand, doc can be changed later. Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like them to decide on how to change these docs...

*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc linked here has a TODO at the bottom still. Is that something we should address before release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. That's Avi's document? Might we worthwhile taking a look next week.

* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>Array</code> of <code>GlideString</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] glideStringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
*
Expand Down Expand Up @@ -293,8 +353,7 @@ public interface GenericClusterCommands {
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* <code>Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
Expand All @@ -313,6 +372,67 @@ public interface GenericClusterCommands {
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options);

/**
* Incrementally iterates over the keys in the Cluster.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meh - the link isn't necessary. I removed them. ClusterScanCursor is mentioned numerous times in the javadoc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete this one too

* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>Array</code> of <code>GlideString</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] glideStringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(glideStringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scanBinary(ClusterScanCursor cursor, ScanOptions options);

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
66 changes: 66 additions & 0 deletions java/client/src/test/java/glide/api/RedisClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,32 @@ public void scan_existing_cursor() {
((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle());
}

@SneakyThrows
@Test
public void scan_binary_existing_cursor() {
CommandManager.ClusterScanCursorDetail mockCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockCursor.getCursorHandle()).thenReturn("1");

CommandManager.ClusterScanCursorDetail mockResultCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockResultCursor.getCursorHandle()).thenReturn("2");

final Object[] result =
new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}};
final CompletableFuture<Object[]> testResponse = CompletableFuture.completedFuture(result);
when(commandManager.<Object[]>submitClusterScan(
eq(mockCursor), eq(ScanOptions.builder().build()), any()))
.thenReturn(testResponse);

CompletableFuture<Object[]> actualResponse = service.scan(mockCursor);
Object[] payload = actualResponse.get();
assertEquals(
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle());
assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]);
}

@SneakyThrows
@Test
public void scan_new_cursor_options() {
Expand Down Expand Up @@ -3144,4 +3170,44 @@ public void scan_existing_cursor_options() {
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle());
}

@SneakyThrows
@Test
public void scan_binary_existing_cursor_options() {
CommandManager.ClusterScanCursorDetail mockCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockCursor.getCursorHandle()).thenReturn("1");

CommandManager.ClusterScanCursorDetail mockResultCursor =
Mockito.mock(CommandManager.ClusterScanCursorDetail.class);
when(mockResultCursor.getCursorHandle()).thenReturn("2");

final Object[] result =
new Object[] {mockResultCursor.getCursorHandle(), new Object[] {gs("foo")}};
final CompletableFuture<Object[]> testResponse = CompletableFuture.completedFuture(result);
when(commandManager.<Object[]>submitClusterScan(
eq(mockCursor),
eq(
ScanOptions.builder()
.matchPattern("key:*")
.count(10L)
.type(ScanOptions.ObjectType.STRING)
.build()),
any()))
.thenReturn(testResponse);

CompletableFuture<Object[]> actualResponse =
service.scan(
mockCursor,
ScanOptions.builder()
.matchPattern("key:*")
.count(10L)
.type(ScanOptions.ObjectType.STRING)
.build());
Object[] payload = actualResponse.get();
assertEquals(
mockResultCursor.getCursorHandle(),
((CommandManager.ClusterScanCursorDetail) payload[0]).getCursorHandle());
assertArrayEquals(new Object[] {gs("foo")}, (Object[]) payload[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,39 @@ public void test_cluster_scan_simple() {
}
}

@Test
@SneakyThrows
public void test_cluster_scan_binary_simple() {
try (RedisClusterClient client =
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {
assertEquals(OK, client.flushall().get());

String key = "key:test_cluster_scan_simple" + UUID.randomUUID();
Map<String, String> expectedData = new LinkedHashMap<>();
for (int i = 0; i < 100; i++) {
expectedData.put(key + ":" + i, "value " + i);
}

assertEquals(OK, client.mset(expectedData).get());

Set<String> result = new LinkedHashSet<>();
ClusterScanCursor cursor = ClusterScanCursor.initalCursor();
while (!cursor.isFinished()) {
final Object[] response = client.scanBinary(cursor).get();
cursor.releaseCursorHandle();

cursor = (ClusterScanCursor) response[0];
final Object[] data = (Object[]) response[1];
for (Object datum : data) {
result.add(datum.toString());
}
}
cursor.releaseCursorHandle();

assertEquals(expectedData.keySet(), result);
}
}

@Test
@SneakyThrows
public void test_cluster_scan_with_object_type_and_pattern() {
Expand Down
Loading