Skip to content

Commit

Permalink
Java: Add PUBSUB SHARDCHANNELS command (valkey-io#2265)
Browse files Browse the repository at this point in the history
* Add PUBSUB SHARDCHANNELS command in Java

---------

Signed-off-by: James Xin <james.xin@improving.com>
  • Loading branch information
jamesx-improving authored Sep 12, 2024
1 parent cf5483f commit e997eea
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Node: Added XREADGROUP command ([#2124](https://github.com/valkey-io/valkey-glide/pull/2124))
* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122))
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
* Java: Added PUBSUB SHARDCHANNELS command ([#2265](https://github.com/valkey-io/valkey-glide/pull/2265))
* Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109))
* Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132))
* Node: Added HKEYS command ([#2136](https://github.com/valkey-io/valkey-glide/pull/2136))
Expand Down
33 changes: 33 additions & 0 deletions java/client/src/main/java/glide/api/GlideClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static command_request.CommandRequestOuterClass.RequestType.LastSave;
import static command_request.CommandRequestOuterClass.RequestType.Lolwut;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.RandomKey;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
Expand Down Expand Up @@ -1017,6 +1018,38 @@ public CompletableFuture<String> publish(
});
}

@Override
public CompletableFuture<String[]> pubsubShardChannels() {
return commandManager.submitNewCommand(
PubSubSChannels,
new String[0],
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubShardChannelsBinary() {
return commandManager.submitNewCommand(
PubSubSChannels,
new GlideString[0],
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<String[]> pubsubShardChannels(@NonNull String pattern) {
return commandManager.submitNewCommand(
PubSubSChannels,
new String[] {pattern},
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubShardChannels(@NonNull GlideString pattern) {
return commandManager.submitNewCommand(
PubSubSChannels,
new GlideString[] {pattern},
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<String> unwatch(@NonNull Route route) {
return commandManager.submitNewCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,58 @@ public interface PubSubClusterCommands {
* }</pre>
*/
CompletableFuture<String> publish(GlideString message, GlideString channel, boolean sharded);

/**
* Lists the currently active shard channels.
*
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @return An <code>array</code> of all active shard channels.
* @example
* <pre>{@code
* String[] result = client.pubsubShardChannels().get();
* assert Arrays.equals(result, new String[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<String[]> pubsubShardChannels();

/**
* Lists the currently active shard channels.
*
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @return An <code>array</code> of all active shard channels.
* @example
* <pre>{@code
* GlideString[] result = client.pubsubShardChannelsBinary().get();
* assert Arrays.equals(result, new GlideString[] { gs("channel1"), gs("channel2") });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubShardChannelsBinary();

/**
* Lists the currently active shard channels.
*
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active shard channels.
* @return An <code>array</code> of currently active shard channels matching the given pattern.
* @example
* <pre>{@code
* String[] result = client.pubsubShardChannels("channel*").get();
* assert Arrays.equals(result, new String[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<String[]> pubsubShardChannels(String pattern);

/**
* Lists the currently active shard channels.
*
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active shard channels.
* @return An <code>array</code> of currently active shard channels matching the given pattern.
* @example
* <pre>{@code
* GlideString[] result = client.pubsubShardChannels(gs.("channel*")).get();
* assert Arrays.equals(result, new GlideString[] { gs("channel1"), gs("channel2") });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubShardChannels(GlideString pattern);
}
27 changes: 27 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterTransaction.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models;

import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly;
Expand Down Expand Up @@ -136,4 +137,30 @@ public <ArgType> ClusterTransaction sortStore(
.add(destination)));
return this;
}

/**
* Lists the currently active shard channels.
*
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @return Command response - An <code>Array</code> of all active shard channels.
*/
public ClusterTransaction pubsubShardChannels() {
protobufTransaction.addCommands(buildCommand(PubSubSChannels));
return getThis();
}

/**
* Lists the currently active shard channels.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type *
* will throw {@link IllegalArgumentException}.
* @see <a href="https://valkey.io/commands/pubsub-shardchannels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active shard channels.
* @return Command response - An <code>Array</code> of all active shard channels.
*/
public <ArgType> ClusterTransaction pubsubShardChannels(@NonNull ArgType pattern) {
checkTypeOrThrow(pattern);
protobufTransaction.addCommands(buildCommand(PubSubSChannels, newArgsBuilder().add(pattern)));
return getThis();
}
}
95 changes: 95 additions & 0 deletions java/client/src/test/java/glide/api/GlideClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static command_request.CommandRequestOuterClass.RequestType.LastSave;
import static command_request.CommandRequestOuterClass.RequestType.Lolwut;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.RandomKey;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
Expand Down Expand Up @@ -2717,6 +2718,100 @@ public void spublish_returns_success() {
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void pubsubShardChannels_returns_success() {
// setup
String[] arguments = new String[0];
String[] value = new String[] {"ch1", "ch2"};

CompletableFuture<String[]> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<String[]>submitNewCommand(eq(PubSubSChannels), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String[]> response = service.pubsubShardChannels();
String[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void pubsubShardChannelsBinary_returns_success() {
// setup
GlideString[] arguments = new GlideString[0];
GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")};

CompletableFuture<GlideString[]> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<GlideString[]>submitNewCommand(eq(PubSubSChannels), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<GlideString[]> response = service.pubsubShardChannelsBinary();
GlideString[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void pubsubShardChannels_with_pattern_returns_success() {
// setup
String pattern = "ch*";
String[] arguments = new String[] {pattern};
String[] value = new String[] {"ch1", "ch2"};

CompletableFuture<String[]> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<String[]>submitNewCommand(eq(PubSubSChannels), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String[]> response = service.pubsubShardChannels(pattern);
String[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void pubsubShardChannelsBinary_with_pattern_returns_success() {
// setup
GlideString pattern = gs("ch*");
GlideString[] arguments = new GlideString[] {pattern};
GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")};

CompletableFuture<GlideString[]> testResponse = new CompletableFuture<>();
testResponse.complete(value);

// match on protobuf request
when(commandManager.<GlideString[]>submitNewCommand(eq(PubSubSChannels), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<GlideString[]> response = service.pubsubShardChannels(pattern);
GlideString[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(value, payload);
}

@SneakyThrows
@Test
public void sort_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models;

import static command_request.CommandRequestOuterClass.RequestType.PubSubSChannels;
import static command_request.CommandRequestOuterClass.RequestType.SPublish;
import static command_request.CommandRequestOuterClass.RequestType.Sort;
import static command_request.CommandRequestOuterClass.RequestType.SortReadOnly;
Expand Down Expand Up @@ -31,6 +32,12 @@ public void cluster_transaction_builds_protobuf_request() {
transaction.publish("msg", "ch1", true);
results.add(Pair.of(SPublish, buildArgs("ch1", "msg")));

transaction.pubsubShardChannels();
results.add(Pair.of(PubSubSChannels, buildArgs()));

transaction.pubsubShardChannels("test*");
results.add(Pair.of(PubSubSChannels, buildArgs("test*")));

transaction.sortReadOnly(
"key1",
SortClusterOptions.builder()
Expand Down
55 changes: 55 additions & 0 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1468,4 +1468,59 @@ public void pubsub_channels_and_numpat_and_numsub_in_transaction(boolean standal
},
result);
}

@SneakyThrows
@Test
public void pubsub_shard_channels() {
assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7");

// no channels exists yet
GlideClusterClient client = (GlideClusterClient) createClient(false);
assertEquals(0, client.pubsubShardChannels().get().length);
assertEquals(0, client.pubsubShardChannelsBinary().get().length);
assertEquals(0, client.pubsubShardChannels("*").get().length);
assertEquals(0, client.pubsubShardChannels(gs("*")).get().length);

var channels = Set.of("test_shardchannel1", "test_shardchannel2", "some_shardchannel3");
String pattern = "test_*";

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
Map.of(
PubSubClusterChannelMode.SHARDED,
channels.stream().map(GlideString::gs).collect(Collectors.toSet()));

GlideClusterClient listener =
(GlideClusterClient) createClientWithSubscriptions(false, subscriptions);
clients.addAll(List.of(client, listener));

// test without pattern
assertEquals(channels, Set.of(client.pubsubShardChannels().get()));
assertEquals(channels, Set.of(listener.pubsubShardChannels().get()));
assertEquals(
channels.stream().map(GlideString::gs).collect(Collectors.toSet()),
Set.of(client.pubsubShardChannelsBinary().get()));
assertEquals(
channels.stream().map(GlideString::gs).collect(Collectors.toSet()),
Set.of(listener.pubsubShardChannelsBinary().get()));

// test with pattern
assertEquals(
Set.of("test_shardchannel1", "test_shardchannel2"),
Set.of(client.pubsubShardChannels(pattern).get()));
assertEquals(
Set.of(gs("test_shardchannel1"), gs("test_shardchannel2")),
Set.of(client.pubsubShardChannels(gs(pattern)).get()));
assertEquals(
Set.of("test_shardchannel1", "test_shardchannel2"),
Set.of(listener.pubsubShardChannels(pattern).get()));
assertEquals(
Set.of(gs("test_shardchannel1"), gs("test_shardchannel2")),
Set.of(listener.pubsubShardChannels(gs(pattern)).get()));

// test with non-matching pattern
assertEquals(0, client.pubsubShardChannels("non_matching_*").get().length);
assertEquals(0, client.pubsubShardChannels(gs("non_matching_*")).get().length);
assertEquals(0, listener.pubsubShardChannels("non_matching_*").get().length);
assertEquals(0, listener.pubsubShardChannels(gs("non_matching_*")).get().length);
}
}

0 comments on commit e997eea

Please sign in to comment.