Skip to content

Commit

Permalink
Java: Allow cluster wide scan with not fully covered cluster
Browse files Browse the repository at this point in the history
Signed-off-by: James Xin <james.xin@improving.com>
  • Loading branch information
jamesx-improving committed Dec 18, 2024
1 parent 9c6b040 commit 978750a
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public String[] toArgs() {
return super.toArgs();
}

private final Boolean allowNonCoveredSlots;

/**
* @return the pattern used for the <code>MATCH</code> filter.
*/
Expand All @@ -86,4 +88,11 @@ public Long getCount() {
public ObjectType getType() {
return type;
}

/**
* @return whether allow non covered slots.
*/
public Boolean getAllowNonCoveredSlots() {
return allowNonCoveredSlots;
}
}
4 changes: 4 additions & 0 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ protected CommandRequest.Builder prepareCursorRequest(
clusterScanBuilder.setObjectType(options.getType().getNativeName());
}

if (options.getAllowNonCoveredSlots() != null) {
clusterScanBuilder.setAllowNonCoveredSlots(options.getAllowNonCoveredSlots());
}

return CommandRequest.newBuilder().setClusterScan(clusterScanBuilder.build());
}

Expand Down
39 changes: 39 additions & 0 deletions java/integTest/src/test/java/glide/TestUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -452,4 +452,43 @@ public static String getServerVersion(@NonNull final BaseClient client) {
}
return null;
}

/**
* Helper function to get a number of nodes, and ask the cluster till we get the number of nodes
*
* @param clusterClient Glide cluster client to be used for executing custom command
* @param count number of nodes expected
* @return true if we get the number of expected nodes
*/
@SneakyThrows
public static boolean waitForClusterReady(GlideClusterClient clusterClient, int count) {
long timeout = 20000; // 20 seconds
long startTime = System.currentTimeMillis();

while (true) {
if (System.currentTimeMillis() - startTime > timeout) {
return false;
}
ClusterValue<Object> clusterInfo =
clusterClient.customCommand(new String[] {"CLUSTER", "INFO"}).get();
if (clusterInfo != null && clusterInfo.hasSingleData()) {
String[] clusterInfoLines = ((String) clusterInfo.getSingleValue()).split("\n");
Map<String, String> clusterInfoMap =
Arrays.stream(clusterInfoLines)
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(s -> s.split(":", 2))
.collect(Collectors.toMap(parts -> parts[0].trim(), parts -> parts[1].trim()));
if ("ok".equals(clusterInfoMap.get("cluster_state"))
&& Integer.parseInt(clusterInfoMap.getOrDefault("cluster_known_nodes", "0")) == count) {
break;
}
}
Thread.sleep(2000);
}
// we need to make sure that the inner core refresh slots so we make sure we accumulate 60
// seconds
Thread.sleep(60000 - (System.currentTimeMillis() - startTime));
return true;
}
}
96 changes: 96 additions & 0 deletions java/integTest/src/test/java/glide/cluster/CommandTests.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.cluster;

import static glide.TestConfiguration.CLUSTER_HOSTS;
import static glide.TestConfiguration.SERVER_VERSION;
import static glide.TestUtilities.assertDeepEquals;
import static glide.TestUtilities.checkFunctionListResponse;
Expand All @@ -15,6 +16,7 @@
import static glide.TestUtilities.getFirstEntryFromMultiValue;
import static glide.TestUtilities.getValueFromInfo;
import static glide.TestUtilities.parseInfoResponseToMap;
import static glide.TestUtilities.waitForClusterReady;
import static glide.TestUtilities.waitForNotBusy;
import static glide.api.BaseClient.OK;
import static glide.api.models.GlideString.gs;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -3015,6 +3018,99 @@ public void test_cluster_scan_all_stream() {
assertEquals(streamData.keySet(), results);
}

@Test
@SneakyThrows
@Timeout(100)
public void test_cluster_scan_non_covered_slots() {
assertEquals(OK, clusterClient.flushall().get());
String key = UUID.randomUUID().toString();
for (int i = 0; i < 1000; i++) {
String result = clusterClient.set(key + ":" + i, "value").get();
assertEquals(OK, result);
}
ClusterScanCursor cursor = ClusterScanCursor.initalCursor();
Object[] response = clusterClient.scan(cursor).get();
cursor.releaseCursorHandle();
cursor = (ClusterScanCursor) response[0];
assertFalse(cursor.isFinished());
clusterClient.configSet(Map.of("cluster-require-full-coverage", "no"));
// forget one server
String addressToForget = CLUSTER_HOSTS[0];
String[] splitAddressToForget = addressToForget.split(":");
String[] allOtherAddresses = Arrays.copyOfRange(CLUSTER_HOSTS, 1, CLUSTER_HOSTS.length);
var idToForget =
clusterClient
.customCommand(
new String[] {"CLUSTER", "MYID"},
new ByAddressRoute(
splitAddressToForget[0], Integer.parseInt(splitAddressToForget[1])))
.get()
.getSingleValue();
for (String otherAddress : allOtherAddresses) {
String[] splitOtherAddress = otherAddress.split(":");
clusterClient.customCommand(
new String[] {"CLUSTER", "FORGET", (String) idToForget},
new ByAddressRoute(splitOtherAddress[0], Integer.parseInt(splitOtherAddress[1])));
}
// now we let it few seconds gossip to get the new cluster configuration
assertTrue(waitForClusterReady(clusterClient, allOtherAddresses.length));
// Iterate scan to get missing slots error
ExecutionException executionException =
assertThrows(
ExecutionException.class,
() -> {
ClusterScanCursor cursor2 = ClusterScanCursor.initalCursor();
while (!cursor2.isFinished()) {
Object[] scanResponse = clusterClient.scan(cursor2).get();
cursor2.releaseCursorHandle();
cursor2 = (ClusterScanCursor) scanResponse[0];
}
});
assertTrue(
executionException
.getMessage()
.contains("Could not find an address covering a slot, SCAN operation cannot continue"));

// Scan with allow_non_covered_slots=true
ClusterScanCursor cursor3 = ClusterScanCursor.initalCursor();
while (!cursor3.isFinished()) {
Object[] nonCoverScanResult =
clusterClient
.scan(cursor3, ScanOptions.builder().allowNonCoveredSlots(true).build())
.get();
cursor3.releaseCursorHandle();
cursor3 = (ClusterScanCursor) nonCoverScanResult[0];
}
assertTrue(cursor3.isFinished());
// Get keys using 'KEYS *' from the remaining nodes
List<Object> keys = new ArrayList<>();
for (String address : allOtherAddresses) {
String[] splitAddress = address.split(":");
ClusterValue<Object> keysResult =
clusterClient
.customCommand(
new String[] {"KEYS", "*"},
new ByAddressRoute(splitAddress[0], Integer.parseInt(splitAddress[1])))
.get();
assertNotNull(keysResult);
assertTrue(keysResult.hasSingleData());
keys.addAll(List.of((Object[]) keysResult.getSingleValue()));
}

ClusterScanCursor cursor4 = ClusterScanCursor.initalCursor();
List<Object> results = new ArrayList<>();
while (!cursor4.isFinished()) {
Object[] result =
clusterClient
.scan(cursor4, ScanOptions.builder().allowNonCoveredSlots(true).build())
.get();
results.addAll(List.of((Object[]) result[1]));
cursor4.releaseCursorHandle();
cursor4 = (ClusterScanCursor) result[0];
}
assertEquals(new HashSet<>(keys), new HashSet<>(results));
}

@SneakyThrows
@Test
public void invokeScript_test() {
Expand Down

0 comments on commit 978750a

Please sign in to comment.