diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index ab3e4b18da..2d567b0b75 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -99,6 +99,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.PfAdd; import static redis_request.RedisRequestOuterClass.RequestType.PfCount; import static redis_request.RedisRequestOuterClass.RequestType.PfMerge; +import static redis_request.RedisRequestOuterClass.RequestType.Publish; import static redis_request.RedisRequestOuterClass.RequestType.RPop; import static redis_request.RedisRequestOuterClass.RequestType.RPush; import static redis_request.RedisRequestOuterClass.RequestType.RPushX; @@ -183,6 +184,7 @@ import glide.api.commands.HashBaseCommands; import glide.api.commands.HyperLogLogBaseCommands; import glide.api.commands.ListBaseCommands; +import glide.api.commands.PubSubBaseCommands; import glide.api.commands.ScriptingAndFunctionsBaseCommands; import glide.api.commands.SetBaseCommands; import glide.api.commands.SortedSetBaseCommands; @@ -190,6 +192,7 @@ import glide.api.commands.StringBaseCommands; import glide.api.commands.TransactionsBaseCommands; import glide.api.models.GlideString; +import glide.api.models.PubSubMessage; import glide.api.models.Script; import glide.api.models.commands.ExpireOptions; import glide.api.models.commands.GetExOptions; @@ -230,33 +233,38 @@ import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.BaseClientConfiguration; +import glide.api.models.configuration.BaseSubscriptionConfiguration; +import glide.api.models.exceptions.ConfigurationError; import glide.api.models.exceptions.RedisException; import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; +import glide.connectors.handlers.MessageHandler; import glide.connectors.resources.Platform; import glide.connectors.resources.ThreadPoolResource; import glide.connectors.resources.ThreadPoolResourceAllocator; import glide.ffi.resolvers.RedisValueResolver; -import glide.managers.BaseCommandResponseResolver; +import glide.managers.BaseResponseResolver; import glide.managers.CommandManager; import glide.managers.ConnectionManager; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.NotImplementedException; import response.ResponseOuterClass.ConstantResponse; import response.ResponseOuterClass.Response; /** Base Client class for Redis */ -@AllArgsConstructor public abstract class BaseClient implements AutoCloseable, BitmapBaseCommands, @@ -270,13 +278,41 @@ public abstract class BaseClient HyperLogLogBaseCommands, GeospatialIndicesBaseCommands, ScriptingAndFunctionsBaseCommands, - TransactionsBaseCommands { + TransactionsBaseCommands, + PubSubBaseCommands { /** Redis simple string response with "OK" */ public static final String OK = ConstantResponse.OK.toString(); - protected final ConnectionManager connectionManager; protected final CommandManager commandManager; + protected final ConnectionManager connectionManager; + protected final ConcurrentLinkedDeque messageQueue; + protected final Optional subscriptionConfiguration; + + /** Helper which extracts data from received {@link Response}s from GLIDE. */ + private static final BaseResponseResolver responseResolver = + new BaseResponseResolver(RedisValueResolver::valueFromPointer); + + /** Helper which extracts data with binary strings from received {@link Response}s from GLIDE. */ + private static final BaseResponseResolver binaryResponseResolver = + new BaseResponseResolver(RedisValueResolver::valueFromPointerBinary); + + /** A constructor. */ + protected BaseClient(ClientBuilder builder) { + this.connectionManager = builder.connectionManager; + this.commandManager = builder.commandManager; + this.messageQueue = builder.messageQueue; + this.subscriptionConfiguration = builder.subscriptionConfiguration; + } + + /** Auxiliary builder which wraps all fields to be initialized in the constructor. */ + @RequiredArgsConstructor + protected static class ClientBuilder { + private final ConnectionManager connectionManager; + private final CommandManager commandManager; + private final ConcurrentLinkedDeque messageQueue; + private final Optional subscriptionConfiguration; + } /** * Async request for an async (non-blocking) Redis client. @@ -286,31 +322,81 @@ public abstract class BaseClient * @param Client type. * @return a Future to connect and return a RedisClient. */ - protected static CompletableFuture CreateClient( - BaseClientConfiguration config, - BiFunction constructor) { + protected static CompletableFuture CreateClient( + @NonNull BaseClientConfiguration config, Function constructor) { try { ThreadPoolResource threadPoolResource = config.getThreadPoolResource(); if (threadPoolResource == null) { threadPoolResource = ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier()); } - ChannelHandler channelHandler = buildChannelHandler(threadPoolResource); + MessageHandler messageHandler = buildMessageHandler(config); + ChannelHandler channelHandler = buildChannelHandler(threadPoolResource, messageHandler); ConnectionManager connectionManager = buildConnectionManager(channelHandler); CommandManager commandManager = buildCommandManager(channelHandler); // TODO: Support exception throwing, including interrupted exceptions return connectionManager .connectToRedis(config) - .thenApply(ignore -> constructor.apply(connectionManager, commandManager)); + .thenApply( + ignored -> + constructor.apply( + new ClientBuilder( + connectionManager, + commandManager, + messageHandler.getQueue(), + Optional.ofNullable(config.getSubscriptionConfiguration())))); } catch (InterruptedException e) { - // Something bad happened while we were establishing netty connection to - // UDS + // Something bad happened while we were establishing netty connection to UDS var future = new CompletableFuture(); future.completeExceptionally(e); return future; } } + /** + * Tries to return a next pubsub message. + * + * @throws ConfigurationError If client is not subscribed to any channel or if client configured + * with a callback. + * @return A message if any or null if there are no unread messages. + */ + public PubSubMessage tryGetPubSubMessage() { + if (subscriptionConfiguration.isEmpty()) { + throw new ConfigurationError( + "The operation will never complete since there was no pubsub subscriptions applied to the" + + " client."); + } + if (subscriptionConfiguration.get().getCallback().isPresent()) { + throw new ConfigurationError( + "The operation will never complete since messages will be passed to the configured" + + " callback."); + } + return messageQueue.poll(); + } + + /** + * Returns a promise for a next pubsub message. + * + * @apiNote Not implemented! + * @throws ConfigurationError If client is not subscribed to any channel or if client configured + * with a callback. + * @return A Future which resolved with the next incoming message. + */ + public CompletableFuture getPubSubMessage() { + if (subscriptionConfiguration.isEmpty()) { + throw new ConfigurationError( + "The operation will never complete since there was no pubsub subscriptions applied to the" + + " client."); + } + if (subscriptionConfiguration.get().getCallback().isPresent()) { + throw new ConfigurationError( + "The operation will never complete since messages will be passed to the configured" + + " callback."); + } + throw new NotImplementedException( + "This feature will be supported in a future release of the GLIDE java client"); + } + /** * Closes this resource, relinquishing any underlying resources. This method is invoked * automatically on objects managed by the try-with-resources statement. @@ -323,15 +409,25 @@ public void close() throws ExecutionException { try { connectionManager.closeConnection().get(); } catch (InterruptedException e) { - // suppressing the interrupted exception - it is already suppressed in the - // future + // suppressing the interrupted exception - it is already suppressed in the future throw new RuntimeException(e); } } - protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource) + protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) { + if (config.getSubscriptionConfiguration() == null) { + return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver); + } + return new MessageHandler( + config.getSubscriptionConfiguration().getCallback(), + config.getSubscriptionConfiguration().getContext(), + responseResolver); + } + + protected static ChannelHandler buildChannelHandler( + ThreadPoolResource threadPoolResource, MessageHandler messageHandler) throws InterruptedException { - CallbackDispatcher callbackDispatcher = new CallbackDispatcher(); + CallbackDispatcher callbackDispatcher = new CallbackDispatcher(messageHandler); return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource); } @@ -367,10 +463,7 @@ protected T handleRedisResponse( boolean encodingUtf8 = flags.contains(ResponseFlags.ENCODING_UTF8); boolean isNullable = flags.contains(ResponseFlags.IS_NULLABLE); Object value = - encodingUtf8 - ? new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response) - : new BaseCommandResponseResolver(RedisValueResolver::valueFromPointerBinary) - .apply(response); + encodingUtf8 ? responseResolver.apply(response) : binaryResponseResolver.apply(response); if (isNullable && (value == null)) { return null; } @@ -2821,6 +2914,18 @@ public CompletableFuture> lcsIdxWithMatchLen( return commandManager.submitNewCommand(LCS, arguments, this::handleMapResponse); } + @Override + public CompletableFuture publish(@NonNull String channel, @NonNull String message) { + return commandManager.submitNewCommand( + Publish, + new String[] {channel, message}, + response -> { + // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO + handleLongResponse(response); + return OK; + }); + } + @Override public CompletableFuture watch(@NonNull String[] keys) { return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse); diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index 0f8607afb7..096adfe0f2 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -54,8 +54,6 @@ import glide.api.models.commands.SortOptions; import glide.api.models.commands.function.FunctionRestorePolicy; import glide.api.models.configuration.RedisClientConfiguration; -import glide.managers.CommandManager; -import glide.managers.ConnectionManager; import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -73,15 +71,18 @@ public class RedisClient extends BaseClient ScriptingAndFunctionsCommands, TransactionsCommands { - protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) { - super(connectionManager, commandManager); + /** + * A constructor. Use {@link #CreateClient} to get a client. Made protected to simplify testing. + */ + protected RedisClient(ClientBuilder builder) { + super(builder); } /** * Async request for an async (non-blocking) Redis client in Standalone mode. * - * @param config Redis client Configuration - * @return A Future to connect and return a RedisClient + * @param config Redis client Configuration. + * @return A Future to connect and return a RedisClient. */ public static CompletableFuture CreateClient( @NonNull RedisClientConfiguration config) { diff --git a/java/client/src/main/java/glide/api/RedisClusterClient.java b/java/client/src/main/java/glide/api/RedisClusterClient.java index 26fc3e86e7..92c5d3bbdd 100644 --- a/java/client/src/main/java/glide/api/RedisClusterClient.java +++ b/java/client/src/main/java/glide/api/RedisClusterClient.java @@ -37,6 +37,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Lolwut; import static redis_request.RedisRequestOuterClass.RequestType.Ping; import static redis_request.RedisRequestOuterClass.RequestType.RandomKey; +import static redis_request.RedisRequestOuterClass.RequestType.SPublish; import static redis_request.RedisRequestOuterClass.RequestType.Sort; import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly; import static redis_request.RedisRequestOuterClass.RequestType.Time; @@ -44,6 +45,7 @@ import glide.api.commands.ConnectionManagementClusterCommands; import glide.api.commands.GenericClusterCommands; +import glide.api.commands.PubSubClusterCommands; import glide.api.commands.ScriptingAndFunctionsClusterCommands; import glide.api.commands.ServerManagementClusterCommands; import glide.api.commands.TransactionsClusterCommands; @@ -57,8 +59,6 @@ import glide.api.models.configuration.RedisClusterClientConfiguration; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; -import glide.managers.CommandManager; -import glide.managers.ConnectionManager; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -77,17 +77,19 @@ public class RedisClusterClient extends BaseClient GenericClusterCommands, ServerManagementClusterCommands, ScriptingAndFunctionsClusterCommands, - TransactionsClusterCommands { + TransactionsClusterCommands, + PubSubClusterCommands { - protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) { - super(connectionManager, commandManager); + /** A private constructor. Use {@link #CreateClient} to get a client. */ + RedisClusterClient(ClientBuilder builder) { + super(builder); } /** * Async request for an async (non-blocking) Redis client in Cluster mode. * - * @param config Redis cluster client Configuration - * @return A Future to connect and return a RedisClusterClient + * @param config Redis cluster client Configuration. + * @return A Future to connect and return a RedisClusterClient. */ public static CompletableFuture CreateClient( @NonNull RedisClusterClientConfiguration config) { @@ -780,6 +782,18 @@ public CompletableFuture randomKey() { RandomKey, new String[0], this::handleStringOrNullResponse); } + @Override + public CompletableFuture spublish(@NonNull String channel, @NonNull String message) { + return commandManager.submitNewCommand( + SPublish, + new String[] {channel, message}, + response -> { + // Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO + handleLongResponse(response); + return OK; + }); + } + @Override public CompletableFuture sort( @NonNull String key, @NonNull SortClusterOptions sortClusterOptions) { diff --git a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java new file mode 100644 index 0000000000..d84aa66b6e --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java @@ -0,0 +1,27 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.commands; + +import java.util.concurrent.CompletableFuture; + +/** + * Supports commands for the "Pub/Sub" group for standalone and cluster clients. + * + * @see Pub/Sub Commands + */ +public interface PubSubBaseCommands { + + /** + * Publishes message on pubsub channel. + * + * @see redis.io for details. + * @param channel The channel to publish the message on. + * @param message The message to publish. + * @return OK. + * @example + *
{@code
+     * String response = client.publish("announcements", "The cat said 'meow'!").get();
+     * assert response.equals("OK");
+     * }
+ */ + CompletableFuture publish(String channel, String message); +} diff --git a/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java new file mode 100644 index 0000000000..a9dd7403a1 --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/PubSubClusterCommands.java @@ -0,0 +1,28 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.commands; + +import java.util.concurrent.CompletableFuture; + +/** + * Supports commands for the "Pub/Sub" group for a cluster client. + * + * @see Pub/Sub Commands + */ +public interface PubSubClusterCommands { + + /** + * Publishes message on pubsub channel in sharded mode. + * + * @since Redis 7.0 and above. + * @see redis.io for details. + * @param channel The channel to publish the message on. + * @param message The message to publish. + * @return OK. + * @example + *
{@code
+     * String response = client.spublish("announcements", "The cat said 'meow'!").get();
+     * assert response.equals("OK");
+     * }
+ */ + CompletableFuture spublish(String channel, String message); +} diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index b9be20f720..593cda2bba 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -125,6 +125,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.PfCount; import static redis_request.RedisRequestOuterClass.RequestType.PfMerge; import static redis_request.RedisRequestOuterClass.RequestType.Ping; +import static redis_request.RedisRequestOuterClass.RequestType.Publish; import static redis_request.RedisRequestOuterClass.RequestType.RPop; import static redis_request.RedisRequestOuterClass.RequestType.RPush; import static redis_request.RedisRequestOuterClass.RequestType.RPushX; @@ -4915,6 +4916,19 @@ public T lcsLen(@NonNull String key1, @NonNull String key2) { return getThis(); } + /** + * Publishes message on pubsub channel. + * + * @see redis.io for details. + * @param channel The channel to publish the message on. + * @param message The message to publish. + * @return Command response - The number of clients that received the message. + */ + public T publish(@NonNull String channel, @NonNull String message) { + protobufTransaction.addCommands(buildCommand(Publish, buildArgs(channel, message))); + return getThis(); + } + /** * Gets the union of all the given sets. * diff --git a/java/client/src/main/java/glide/api/models/ClusterTransaction.java b/java/client/src/main/java/glide/api/models/ClusterTransaction.java index 5c220f59a7..42354669f1 100644 --- a/java/client/src/main/java/glide/api/models/ClusterTransaction.java +++ b/java/client/src/main/java/glide/api/models/ClusterTransaction.java @@ -3,11 +3,11 @@ import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING; import static glide.utils.ArrayTransformUtils.concatenateArrays; +import static redis_request.RedisRequestOuterClass.RequestType.SPublish; import static redis_request.RedisRequestOuterClass.RequestType.Sort; import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly; import glide.api.models.commands.SortClusterOptions; -import lombok.AllArgsConstructor; import lombok.NonNull; import org.apache.commons.lang3.ArrayUtils; @@ -29,13 +29,26 @@ * // result contains: OK and "value" * */ -@AllArgsConstructor public class ClusterTransaction extends BaseTransaction { @Override protected ClusterTransaction getThis() { return this; } + /** + * Publishes message on pubsub channel in sharded mode. + * + * @since Redis 7.0 and above. + * @see redis.io for details. + * @param channel The channel to publish the message on. + * @param message The message to publish. + * @return Command response - The number of clients that received the message. + */ + public ClusterTransaction spublish(@NonNull String channel, @NonNull String message) { + protobufTransaction.addCommands(buildCommand(SPublish, channel, message)); + return getThis(); + } + /** * Sorts the elements in the list, set, or sorted set at key and returns the result. *
diff --git a/java/client/src/main/java/glide/api/models/PubSubMessage.java b/java/client/src/main/java/glide/api/models/PubSubMessage.java new file mode 100644 index 0000000000..c3109956f1 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/PubSubMessage.java @@ -0,0 +1,41 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models; + +import java.util.Optional; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +/** PubSub message received by the client. */ +@Getter +@EqualsAndHashCode +public class PubSubMessage { + /** An incoming message received. */ + private final String message; + + /** A name of the originating channel. */ + private final String channel; + + /** A pattern matched to the channel name. */ + private final Optional pattern; + + public PubSubMessage(String message, String channel, String pattern) { + this.message = message; + this.channel = channel; + this.pattern = Optional.ofNullable(pattern); + } + + public PubSubMessage(String message, String channel) { + this.message = message; + this.channel = channel; + this.pattern = Optional.empty(); + } + + @Override + public String toString() { + String res = String.format("%s, channel = %s", message, channel); + if (pattern.isPresent()) { + res += ", pattern = " + pattern.get(); + } + return res; + } +} diff --git a/java/client/src/main/java/glide/api/models/configuration/BackoffStrategy.java b/java/client/src/main/java/glide/api/models/configuration/BackoffStrategy.java index 45f04e986d..c2e8d0f0c1 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BackoffStrategy.java +++ b/java/client/src/main/java/glide/api/models/configuration/BackoffStrategy.java @@ -12,6 +12,15 @@ * *

Once the maximum value is reached, that will remain the time between retry attempts until a * reconnect attempt is successful. The client will attempt to reconnect indefinitely. + * + * @example + *

{@code
+ * BackoffStrategy reconnectionConfiguration = BackoffStrategy.builder()
+ *     .numOfRetries(5)
+ *     .exponentBase(2)
+ *     .factor(3)
+ *     .build()
+ * }
*/ @Getter @Builder diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java index f1fe8319a4..1fbca52fe9 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java @@ -64,4 +64,6 @@ public abstract class BaseClientConfiguration { * loop group. If set, users are responsible for shutting the resource down when no longer in use. */ private final ThreadPoolResource threadPoolResource; + + public abstract BaseSubscriptionConfiguration getSubscriptionConfiguration(); } diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java new file mode 100644 index 0000000000..12548cb9f0 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/BaseSubscriptionConfiguration.java @@ -0,0 +1,106 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import glide.api.BaseClient; +import glide.api.models.PubSubMessage; +import glide.api.models.configuration.ClusterSubscriptionConfiguration.ClusterSubscriptionConfigurationBuilder; +import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration.StandaloneSubscriptionConfigurationBuilder; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiConsumer; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Client subscription configuration. Could be either {@link StandaloneSubscriptionConfiguration} or + * {@link ClusterSubscriptionConfiguration}. + */ +@Getter +@RequiredArgsConstructor +public abstract class BaseSubscriptionConfiguration { + + /** + * A channel subscription mode. Could be either {@link PubSubChannelMode} or {@link + * PubSubClusterChannelMode}. + */ + public interface ChannelMode {} + + /** + * Callback called for every incoming message. It should be a fast, non-blocking operation to + * avoid issues. A next call could happen even before then the previous call complete.
+ * The callback arguments are: + * + *
    + *
  1. A received {@link PubSubMessage}. + *
  2. A user-defined {@link #context} or null if not configured. + *
+ */ + public interface MessageCallback extends BiConsumer {} + + /** + * Optional callback to accept the incoming messages. See {@link MessageCallback}.
+ * If not set, messages will be available via {@link BaseClient#tryGetPubSubMessage()} or {@link + * BaseClient#getPubSubMessage()}. + */ + protected final Optional callback; + + /** + * Optional arbitrary context, which will be passed to callback along with all received messages. + *
+ * Could be used to distinguish clients if multiple clients use a shared callback. + */ + protected final Optional context; + + // All code below is a custom implementation of `SuperBuilder`, because we provide + // custom user-friendly API `callback` and `subscription`. + /** + * Superclass for {@link ClusterSubscriptionConfigurationBuilder} and for {@link + * StandaloneSubscriptionConfigurationBuilder}. + */ + public abstract static class BaseSubscriptionConfigurationBuilder< + B extends BaseSubscriptionConfigurationBuilder, + C extends BaseSubscriptionConfiguration> { + + protected Optional callback = Optional.empty(); + protected Optional context = Optional.empty(); + + protected void addSubscription( + Map> subscriptions, M mode, String channelOrPattern) { + if (!subscriptions.containsKey(mode)) { + subscriptions.put(mode, new HashSet<>()); + } + subscriptions.get(mode).add(channelOrPattern); + } + + protected abstract B self(); + + protected abstract C build(); + + /** + * Set a callback and a context. + * + * @param callback The {@link #callback}. + * @param context The {@link #context}. + */ + public B callback(MessageCallback callback, Object context) { + this.callback = Optional.ofNullable(callback); + this.context = Optional.ofNullable(context); + return self(); + } + + /** + * Set a callback without context. null will be supplied to all callback calls as a + * context. + * + * @param callback The {@link #callback}. + */ + public B callback(MessageCallback callback) { + this.callback = Optional.ofNullable(callback); + return self(); + } + } +} diff --git a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java new file mode 100644 index 0000000000..8a0b031a06 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java @@ -0,0 +1,124 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import glide.api.RedisClusterClient; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import lombok.Getter; + +/** + * Subscription configuration for {@link RedisClusterClient}. + * + * @example + *
{@code
+ * // Configuration with 3 subscriptions and a callback:
+ * ClusterSubscriptionConfiguration subscriptionConfiguration =
+ *     ClusterSubscriptionConfiguration.builder()
+ *         .subscription(EXACT, "notifications")
+ *         .subscription(EXACT, "news")
+ *         .subscription(SHARDED, "data")
+ *         .callback(callback)
+ *         .build();
+ * // Now it could be supplied to `RedisClusterClientConfiguration`:
+ * RedisClusterClientConfiguration clientConfiguration =
+ *     RedisClusterClientConfiguration.builder()
+ *         .address(NodeAddress.builder().port(6379).build())
+ *         .subscriptionConfiguration(subscriptionConfiguration)
+ *         .build();
+ * }
+ */ +@Getter +public final class ClusterSubscriptionConfiguration extends BaseSubscriptionConfiguration { + + /** + * Describes subscription modes for cluster client. + * + * @see redis.io for details. + */ + public enum PubSubClusterChannelMode implements ChannelMode { + /** Use exact channel names. */ + EXACT, + /** Use glob-style channel name patterns. */ + PATTERN, + /** + * Use sharded pubsub. + * + * @since Redis 7.0 and above. + */ + SHARDED, + } + + /** + * PubSub subscriptions to be used for the client.
+ * Will be applied via SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE + * commands during connection establishment. + */ + private final Map> subscriptions; + + // All code below is a custom implementation of `SuperBuilder` + private ClusterSubscriptionConfiguration( + Optional callback, + Optional context, + Map> subscriptions) { + super(callback, context); + this.subscriptions = subscriptions; + } + + public static ClusterSubscriptionConfigurationBuilder builder() { + return new ClusterSubscriptionConfigurationBuilder(); + } + + /** Builder for {@link ClusterSubscriptionConfiguration}. */ + public static final class ClusterSubscriptionConfigurationBuilder + extends BaseSubscriptionConfigurationBuilder< + ClusterSubscriptionConfigurationBuilder, ClusterSubscriptionConfiguration> { + + private ClusterSubscriptionConfigurationBuilder() {} + + private Map> subscriptions = new HashMap<>(3); + + /** + * Add a subscription to a channel or to multiple channels if {@link + * PubSubClusterChannelMode#PATTERN} is used.
+ * See {@link ClusterSubscriptionConfiguration#subscriptions}. + */ + public ClusterSubscriptionConfigurationBuilder subscription( + PubSubClusterChannelMode mode, String channelOrPattern) { + addSubscription(subscriptions, mode, channelOrPattern); + return this; + } + + /** + * Set all subscriptions in a bulk. Rewrites previously stored configurations.
+ * See {@link ClusterSubscriptionConfiguration#subscriptions}. + */ + public ClusterSubscriptionConfigurationBuilder subscriptions( + Map> subscriptions) { + this.subscriptions = subscriptions; + return this; + } + + /** + * Set subscriptions in a bulk for the given mode. Rewrites previously stored configurations for + * that mode.
+ * See {@link ClusterSubscriptionConfiguration#subscriptions}. + */ + public ClusterSubscriptionConfigurationBuilder subscriptions( + PubSubClusterChannelMode mode, Set subscriptions) { + this.subscriptions.put(mode, subscriptions); + return this; + } + + @Override + protected ClusterSubscriptionConfigurationBuilder self() { + return this; + } + + @Override + public ClusterSubscriptionConfiguration build() { + return new ClusterSubscriptionConfiguration(callback, context, subscriptions); + } + } +} diff --git a/java/client/src/main/java/glide/api/models/configuration/NodeAddress.java b/java/client/src/main/java/glide/api/models/configuration/NodeAddress.java index 97b90c8d01..c5132b8f1a 100644 --- a/java/client/src/main/java/glide/api/models/configuration/NodeAddress.java +++ b/java/client/src/main/java/glide/api/models/configuration/NodeAddress.java @@ -5,7 +5,16 @@ import lombok.Getter; import lombok.NonNull; -/** Represents the address and port of a node in the cluster. */ +/** + * Represents the address and port of a node in the cluster or in standalone installation. + * + * @example + *
{@code
+ * NodeAddress address1 = NodeAddress.builder().build(); // default parameters: localhost:6379
+ * NodeAddress address2 = NodeAddress.builder().port(6380).build(); // localhost:6380
+ * NodeAddress address2 = NodeAddress.builder().address("my.cloud.com").port(12345).build(); // custom address
+ * }
+ */ @Getter @Builder public class NodeAddress { diff --git a/java/client/src/main/java/glide/api/models/configuration/RedisClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/RedisClientConfiguration.java index cd25d262f9..2714c62216 100644 --- a/java/client/src/main/java/glide/api/models/configuration/RedisClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/RedisClientConfiguration.java @@ -1,10 +1,30 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.configuration; +import glide.api.RedisClient; import lombok.Getter; import lombok.experimental.SuperBuilder; -/** Represents the configuration settings for a Standalone Redis client. */ +/** + * Represents the configuration settings for a Standalone {@link RedisClient}. + * + * @example + *
{@code
+ * RedisClientConfiguration redisClientConfiguration =
+ *     RedisClientConfiguration.builder()
+ *         .address(node1address)
+ *         .address(node2address)
+ *         .useTLS(true)
+ *         .readFrom(ReadFrom.PREFER_REPLICA)
+ *         .credentials(credentialsConfiguration)
+ *         .requestTimeout(2000)
+ *         .reconnectStrategy(reconnectionConfiguration)
+ *         .databaseId(1)
+ *         .clientName("GLIDE")
+ *         .subscriptionConfiguration(subscriptionConfiguration)
+ *         .build();
+ * }
+ */ @Getter @SuperBuilder public class RedisClientConfiguration extends BaseClientConfiguration { @@ -13,4 +33,7 @@ public class RedisClientConfiguration extends BaseClientConfiguration { /** Index of the logical database to connect to. */ private final Integer databaseId; + + /** Subscription configuration for the current client. */ + private final StandaloneSubscriptionConfiguration subscriptionConfiguration; } diff --git a/java/client/src/main/java/glide/api/models/configuration/RedisClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/RedisClusterClientConfiguration.java index 3b36709f11..f5a81346ac 100644 --- a/java/client/src/main/java/glide/api/models/configuration/RedisClusterClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/RedisClusterClientConfiguration.java @@ -1,12 +1,34 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.configuration; +import glide.api.RedisClusterClient; +import lombok.Getter; import lombok.experimental.SuperBuilder; /** - * Represents the configuration settings for a Cluster Redis client. Notes: Currently, the - * reconnection strategy in cluster mode is not configurable, and exponential backoff with fixed - * values is used. + * Represents the configuration settings for a Cluster Redis client {@link RedisClusterClient}. + * + * @apiNote Currently, the reconnection strategy in cluster mode is not configurable, and + * exponential backoff with fixed values is used. + * @example + *
{@code
+ * RedisClientConfiguration redisClientConfiguration =
+ *     RedisClientConfiguration.builder()
+ *         .address(node1address)
+ *         .address(node2address)
+ *         .useTLS(true)
+ *         .readFrom(ReadFrom.PREFER_REPLICA)
+ *         .credentials(credentialsConfiguration)
+ *         .requestTimeout(2000)
+ *         .clientName("GLIDE")
+ *         .subscriptionConfiguration(subscriptionConfiguration)
+ *         .build();
+ * }
*/ @SuperBuilder -public class RedisClusterClientConfiguration extends BaseClientConfiguration {} +@Getter +public class RedisClusterClientConfiguration extends BaseClientConfiguration { + + /** Subscription configuration for the current client. */ + private final ClusterSubscriptionConfiguration subscriptionConfiguration; +} diff --git a/java/client/src/main/java/glide/api/models/configuration/RedisCredentials.java b/java/client/src/main/java/glide/api/models/configuration/RedisCredentials.java index c6272dfcde..e6eb976ca6 100644 --- a/java/client/src/main/java/glide/api/models/configuration/RedisCredentials.java +++ b/java/client/src/main/java/glide/api/models/configuration/RedisCredentials.java @@ -5,7 +5,22 @@ import lombok.Getter; import lombok.NonNull; -/** Represents the credentials for connecting to a Redis server. */ +/** + * Represents the credentials for connecting to a Redis server. + * + * @example + *
{@code
+ * // credentials with username:
+ * RedisCredentials credentials1 = RedisCredentials.builder()
+ *     .username("GLIDE")
+ *     .build();
+ * // credentials with username and password:
+ * RedisCredentials credentials2 = RedisCredentials.builder()
+ *     .username("GLIDE")
+ *     .password(pwd)
+ *     .build();
+ * }
+ */ @Getter @Builder public class RedisCredentials { diff --git a/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java new file mode 100644 index 0000000000..7e8070356b --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/StandaloneSubscriptionConfiguration.java @@ -0,0 +1,117 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import glide.api.RedisClient; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import lombok.Getter; + +/** + * Subscription configuration for {@link RedisClient}. + * + * @example + *
{@code
+ * // Configuration with 2 subscriptions, a callback, and a context:
+ * StandaloneSubscriptionConfiguration subscriptionConfiguration =
+ *     StandaloneSubscriptionConfiguration.builder()
+ *         .subscription(EXACT, "notifications")
+ *         .subscription(PATTERN, "news.*")
+ *         .callback(callback, messageConsumer)
+ *         .build();
+ * // Now it could be supplied to `RedisClientConfiguration`:
+ * RedisClientConfiguration clientConfiguration =
+ *     RedisClientConfiguration.builder()
+ *         .address(NodeAddress.builder().port(6379).build())
+ *         .subscriptionConfiguration(subscriptionConfiguration)
+ *         .build();
+ * }
+ */ +@Getter +public final class StandaloneSubscriptionConfiguration extends BaseSubscriptionConfiguration { + + /** + * Describes subscription modes for standalone client. + * + * @see redis.io for details. + */ + public enum PubSubChannelMode implements ChannelMode { + /** Use exact channel names. */ + EXACT, + /** Use glob-style channel name patterns. */ + PATTERN, + } + + /** + * PubSub subscriptions to be used for the client.
+ * Will be applied via SUBSCRIBE/PSUBSCRIBE commands during connection + * establishment. + */ + private final Map> subscriptions; + + // All code below is a custom implementation of `SuperBuilder` + public StandaloneSubscriptionConfiguration( + Optional callback, + Optional context, + Map> subscriptions) { + super(callback, context); + this.subscriptions = subscriptions; + } + + public static StandaloneSubscriptionConfigurationBuilder builder() { + return new StandaloneSubscriptionConfigurationBuilder(); + } + + /** Builder for {@link StandaloneSubscriptionConfiguration}. */ + public static final class StandaloneSubscriptionConfigurationBuilder + extends BaseSubscriptionConfigurationBuilder< + StandaloneSubscriptionConfigurationBuilder, StandaloneSubscriptionConfiguration> { + + private StandaloneSubscriptionConfigurationBuilder() {} + + private Map> subscriptions = new HashMap<>(2); + + /** + * Add a subscription to a channel or to multiple channels if {@link PubSubChannelMode#PATTERN} + * is used.
+ * See {@link StandaloneSubscriptionConfiguration#subscriptions}. + */ + public StandaloneSubscriptionConfigurationBuilder subscription( + PubSubChannelMode mode, String channelOrPattern) { + addSubscription(subscriptions, mode, channelOrPattern); + return self(); + } + + /** + * Set all subscriptions in a bulk. Rewrites previously stored configurations.
+ * See {@link StandaloneSubscriptionConfiguration#subscriptions}. + */ + public StandaloneSubscriptionConfigurationBuilder subscriptions( + Map> subscriptions) { + this.subscriptions = subscriptions; + return this; + } + + /** + * Set subscriptions in a bulk for the given mode. Rewrites previously stored configurations for + * that mode.
+ * See {@link StandaloneSubscriptionConfiguration#subscriptions}. + */ + public StandaloneSubscriptionConfigurationBuilder subscriptions( + PubSubChannelMode mode, Set subscriptions) { + this.subscriptions.put(mode, subscriptions); + return this; + } + + @Override + protected StandaloneSubscriptionConfigurationBuilder self() { + return this; + } + + @Override + public StandaloneSubscriptionConfiguration build() { + return new StandaloneSubscriptionConfiguration(callback, context, subscriptions); + } + } +} diff --git a/java/client/src/main/java/glide/api/models/exceptions/ConfigurationError.java b/java/client/src/main/java/glide/api/models/exceptions/ConfigurationError.java new file mode 100644 index 0000000000..46b80fcef6 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/exceptions/ConfigurationError.java @@ -0,0 +1,9 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.exceptions; + +/** Errors that are thrown when a request cannot be completed in current configuration settings. */ +public class ConfigurationError extends RedisException { + public ConfigurationError(String message) { + super(message); + } +} diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index 1ba510a285..03b499dd66 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -21,6 +21,9 @@ @RequiredArgsConstructor public class CallbackDispatcher { + /** A message handler instance. */ + protected final MessageHandler messageHandler; + /** Unique request ID (callback ID). Thread-safe and overflow-safe. */ protected final AtomicInteger nextAvailableRequestId = new AtomicInteger(0); @@ -78,6 +81,11 @@ public void completeRequest(Response response) { distributeClosingException(response.getClosingError()); return; } + // pass pushes to the message handler and stop processing them + if (response.getIsPush()) { + messageHandler.handle(response); + return; + } // Complete and return the response at callbackId // free up the callback ID in the freeRequestIds list int callbackId = response.getCallbackIdx(); diff --git a/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java new file mode 100644 index 0000000000..74de6ddeb0 --- /dev/null +++ b/java/client/src/main/java/glide/connectors/handlers/MessageHandler.java @@ -0,0 +1,129 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.connectors.handlers; + +import glide.api.models.PubSubMessage; +import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback; +import glide.api.models.exceptions.RedisException; +import glide.managers.BaseResponseResolver; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass.Response; + +/** Handler for incoming push messages (subscriptions). */ +@Getter +@RequiredArgsConstructor +public class MessageHandler { + + // TODO maybe store `BaseSubscriptionConfiguration` as is? + /** + * A user callback to call for every incoming message, if given. If missing, messages are pushed + * into the {@link #queue}. + */ + private final Optional callback; + + /** An arbitrary user object to be passed to callback. */ + private final Optional context; + + /** Helper which extracts data from received {@link Response}s from GLIDE. */ + private final BaseResponseResolver responseResolver; + + /** A message queue wrapper. */ + private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + + /** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */ + public void handle(Response response) { + Object data = responseResolver.apply(response); + if (!(data instanceof Map)) { + // TODO log thru logger https://github.com/aws/glide-for-redis/pull/1422 + System.err.println("Received invalid push: empty or in incorrect format."); + throw new RedisException("Received invalid push: empty or in incorrect format."); + } + @SuppressWarnings("unchecked") + Map push = (Map) data; + PushKind pushType = Enum.valueOf(PushKind.class, (String) push.get("kind")); + Object[] values = (Object[]) push.get("values"); + + switch (pushType) { + case Disconnection: + // TODO log thru logger https://github.com/aws/glide-for-redis/pull/1422 + // ClientLogger.log( + // LogLevel.WARN, + // "disconnect notification", + // "Transport disconnected, messages might be lost", + break; + case PMessage: + handle(new PubSubMessage((String) values[2], (String) values[1], (String) values[0])); + return; + case Message: + case SMessage: + handle(new PubSubMessage((String) values[1], (String) values[0])); + return; + case Subscribe: + case PSubscribe: + case SSubscribe: + case Unsubscribe: + case PUnsubscribe: + case SUnsubscribe: + // ignore for now + // TODO log thru logger https://github.com/aws/glide-for-redis/pull/1422 + System.out.printf( + "Received push notification of type '%s': %s\n", + pushType, + Arrays.stream(values) + .map(v -> v instanceof Number ? v.toString() : String.format("'%s'", v)) + .collect(Collectors.joining(" "))); + break; + default: + // TODO log thru logger https://github.com/aws/glide-for-redis/pull/1422 + System.err.printf("Received push with unsupported type: %s.\n", pushType); + // ClientLogger.log( + // LogLevel.WARN, + // "unknown notification", + // f"Unknown notification message: '{message_kind}'", + } + } + + /** Process a {@link PubSubMessage} received. */ + private void handle(PubSubMessage message) { + if (callback.isPresent()) { + callback.get().accept(message, context.orElse(null)); + } else { + queue.push(message); + } + } + + /** Push type enum copy-pasted 1:1 from `redis-rs`. */ + enum PushKind { + /// `Disconnection` is sent from the **library** when connection is closed. + Disconnection, + /// Other kind to catch future kinds. + Other, + /// `invalidate` is received when a key is changed/deleted. + Invalidate, + /// `message` is received when pubsub message published by another client. + Message, + /// `pmessage` is received when pubsub message published by another client and client subscribed + // to topic via pattern. + PMessage, + /// `smessage` is received when pubsub message published by another client and client subscribed + // to it with sharding. + SMessage, + /// `unsubscribe` is received when client unsubscribed from a channel. + Unsubscribe, + /// `punsubscribe` is received when client unsubscribed from a pattern. + PUnsubscribe, + /// `sunsubscribe` is received when client unsubscribed from a shard channel. + SUnsubscribe, + /// `subscribe` is received when client subscribed to a channel. + Subscribe, + /// `psubscribe` is received when client subscribed to a pattern. + PSubscribe, + /// `ssubscribe` is received when client subscribed to a shard channel. + SSubscribe, + } +} diff --git a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java index 95f4a2a745..ca801a6094 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ReadHandler.java @@ -31,6 +31,7 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO: log thru logger System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); + cause.printStackTrace(); callbackDispatcher.distributeClosingException( "An unhandled error while reading from UDS channel: " + cause); diff --git a/java/client/src/main/java/glide/managers/BaseCommandResponseResolver.java b/java/client/src/main/java/glide/managers/BaseResponseResolver.java similarity index 92% rename from java/client/src/main/java/glide/managers/BaseCommandResponseResolver.java rename to java/client/src/main/java/glide/managers/BaseResponseResolver.java index 7439a0bc7d..458ad0515a 100644 --- a/java/client/src/main/java/glide/managers/BaseCommandResponseResolver.java +++ b/java/client/src/main/java/glide/managers/BaseResponseResolver.java @@ -11,8 +11,7 @@ * Response resolver responsible for evaluating the Redis response object with a success or failure. */ @AllArgsConstructor -public class BaseCommandResponseResolver - implements RedisExceptionCheckedFunction { +public class BaseResponseResolver implements RedisExceptionCheckedFunction { private RedisExceptionCheckedFunction respPointerResolver; diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 79e3252884..68648577b1 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -1,9 +1,12 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.managers; +import com.google.protobuf.ByteString; import connection_request.ConnectionRequestOuterClass; import connection_request.ConnectionRequestOuterClass.AuthenticationInfo; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import connection_request.ConnectionRequestOuterClass.PubSubChannelsOrPatterns; +import connection_request.ConnectionRequestOuterClass.PubSubSubscriptions; import connection_request.ConnectionRequestOuterClass.TlsMode; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.configuration.NodeAddress; @@ -141,6 +144,20 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClient( connectionRequestBuilder.setDatabaseId(configuration.getDatabaseId()); } + if (configuration.getSubscriptionConfiguration() != null) { + // TODO throw ConfigurationError if RESP2 + var subscriptionsBuilder = PubSubSubscriptions.newBuilder(); + for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) { + var channelsBuilder = PubSubChannelsOrPatterns.newBuilder(); + for (var channel : entry.getValue()) { + channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel)); + } + subscriptionsBuilder.putChannelsOrPatternsByType( + entry.getKey().ordinal(), channelsBuilder.build()); + } + connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); + } + return connectionRequestBuilder; } @@ -149,13 +166,26 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClient( * * @param configuration */ - private ConnectionRequestOuterClass.ConnectionRequest.Builder - setupConnectionRequestBuilderRedisClusterClient( - RedisClusterClientConfiguration configuration) { + private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClusterClient( + RedisClusterClientConfiguration configuration) { ConnectionRequest.Builder connectionRequestBuilder = setupConnectionRequestBuilderBaseConfiguration(configuration); connectionRequestBuilder.setClusterModeEnabled(true); + if (configuration.getSubscriptionConfiguration() != null) { + // TODO throw ConfigurationError if RESP2 + var subscriptionsBuilder = PubSubSubscriptions.newBuilder(); + for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) { + var channelsBuilder = PubSubChannelsOrPatterns.newBuilder(); + for (var channel : entry.getValue()) { + channelsBuilder.addChannelsOrPatterns(ByteString.copyFromUtf8(channel)); + } + subscriptionsBuilder.putChannelsOrPatternsByType( + entry.getKey().ordinal(), channelsBuilder.build()); + } + connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); + } + return connectionRequestBuilder; } diff --git a/java/client/src/test/java/glide/ExceptionHandlingTests.java b/java/client/src/test/java/glide/ExceptionHandlingTests.java index eb7232ae3f..f59b50af4d 100644 --- a/java/client/src/test/java/glide/ExceptionHandlingTests.java +++ b/java/client/src/test/java/glide/ExceptionHandlingTests.java @@ -24,7 +24,7 @@ import glide.connectors.handlers.ChannelHandler; import glide.connectors.resources.Platform; import glide.connectors.resources.ThreadPoolResourceAllocator; -import glide.managers.BaseCommandResponseResolver; +import glide.managers.BaseResponseResolver; import glide.managers.CommandManager; import glide.managers.ConnectionManager; import io.netty.channel.ChannelFuture; @@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Stream; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -163,7 +162,7 @@ public void connection_manager_rethrows_non_RedisException_too() { public void close_connection_on_response_with_closing_error() { // CallbackDispatcher throws ClosingException which causes ConnectionManager and CommandManager // to close the channel - var callbackDispatcher = new CallbackDispatcher(); + var callbackDispatcher = new CallbackDispatcher(null); var channelHandler = new TestChannelHandler(callbackDispatcher); var connectionManager = new ConnectionManager(channelHandler); @@ -205,7 +204,7 @@ public void dont_close_connection_when_callback_dispatcher_receives_response_wit // CallbackDispatcher throws a corresponding exception which should not cause // ConnectionManager and CommandManager to close the channel RequestErrorType errorType, Class exceptionType) { - var callbackDispatcher = new CallbackDispatcher(); + var callbackDispatcher = new CallbackDispatcher(null); var channelHandler = new TestChannelHandler(callbackDispatcher); var commandManager = new CommandManager(channelHandler); @@ -230,7 +229,7 @@ public void dont_close_connection_when_callback_dispatcher_receives_response_wit @SneakyThrows public void close_connection_on_response_without_error_but_with_incorrect_callback_id() { // CallbackDispatcher does the same as it received closing error - var callbackDispatcher = new CallbackDispatcher(); + var callbackDispatcher = new CallbackDispatcher(null); var channelHandler = new TestChannelHandler(callbackDispatcher); var connectionManager = new ConnectionManager(channelHandler); @@ -260,7 +259,7 @@ public void close_connection_on_response_without_error_but_with_incorrect_callba @Test public void response_resolver_does_not_expect_errors() { - var resolver = new BaseCommandResponseResolver(null); + var resolver = new BaseResponseResolver(null); var response1 = Response.newBuilder() @@ -274,6 +273,8 @@ public void response_resolver_does_not_expect_errors() { assertEquals("Unhandled response closing error", exception.getMessage()); } + // TODO add tests for error handling in MessageHandler + /** Create a config which causes connection failure. */ private static RedisClientConfiguration createDummyConfig() { return RedisClientConfiguration.builder().build(); @@ -311,11 +312,15 @@ public CompletableFuture connect( } /** Test ChannelHandler extension which aborts futures for all commands. */ - @RequiredArgsConstructor private static class TestCallbackDispatcher extends CallbackDispatcher { public final Throwable exceptionToThrow; + private TestCallbackDispatcher(Throwable exceptionToThrow) { + super(null); + this.exceptionToThrow = exceptionToThrow; + } + @Override public void completeRequest(Response response) { responses.values().forEach(future -> future.completeExceptionally(exceptionToThrow)); diff --git a/java/client/src/test/java/glide/api/RedisClientCreateTest.java b/java/client/src/test/java/glide/api/RedisClientCreateTest.java index ab6f3e9651..439c4205b6 100644 --- a/java/client/src/test/java/glide/api/RedisClientCreateTest.java +++ b/java/client/src/test/java/glide/api/RedisClientCreateTest.java @@ -2,6 +2,7 @@ package glide.api; import static glide.api.BaseClient.buildChannelHandler; +import static glide.api.BaseClient.buildMessageHandler; import static glide.api.RedisClient.CreateClient; import static glide.api.RedisClient.buildCommandManager; import static glide.api.RedisClient.buildConnectionManager; @@ -16,6 +17,7 @@ import glide.api.models.configuration.RedisClientConfiguration; import glide.api.models.exceptions.ClosingException; import glide.connectors.handlers.ChannelHandler; +import glide.connectors.handlers.MessageHandler; import glide.connectors.resources.ThreadPoolResource; import glide.connectors.resources.ThreadPoolResourceAllocator; import glide.managers.CommandManager; @@ -34,6 +36,7 @@ public class RedisClientCreateTest { private ChannelHandler channelHandler; private ConnectionManager connectionManager; private CommandManager commandManager; + private MessageHandler messageHandler; private ThreadPoolResource threadPoolResource; @BeforeEach @@ -43,11 +46,13 @@ public void init() { channelHandler = mock(ChannelHandler.class); commandManager = mock(CommandManager.class); connectionManager = mock(ConnectionManager.class); + messageHandler = mock(MessageHandler.class); threadPoolResource = mock(ThreadPoolResource.class); - mockedClient.when(() -> buildChannelHandler(any())).thenReturn(channelHandler); + mockedClient.when(() -> buildChannelHandler(any(), any())).thenReturn(channelHandler); mockedClient.when(() -> buildConnectionManager(channelHandler)).thenReturn(connectionManager); mockedClient.when(() -> buildCommandManager(channelHandler)).thenReturn(commandManager); + mockedClient.when(() -> buildMessageHandler(any())).thenReturn(messageHandler); mockedClient.when(() -> CreateClient(any(), any())).thenCallRealMethod(); var threadPoolResource = ThreadPoolResourceAllocator.getOrCreate(() -> null); @@ -121,4 +126,6 @@ public void createClient_error_on_connection_throws_ExecutionException() { // verify assertEquals(exception, executionException.getCause()); } + + // TODO check message queue and subscriptionConfiguration } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 3c50ffd6ac..4a0b8144cd 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -180,6 +180,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.PfCount; import static redis_request.RedisRequestOuterClass.RequestType.PfMerge; import static redis_request.RedisRequestOuterClass.RequestType.Ping; +import static redis_request.RedisRequestOuterClass.RequestType.Publish; import static redis_request.RedisRequestOuterClass.RequestType.RPop; import static redis_request.RedisRequestOuterClass.RequestType.RPush; import static redis_request.RedisRequestOuterClass.RequestType.RPushX; @@ -327,7 +328,6 @@ import glide.api.models.commands.stream.StreamTrimOptions.MaxLen; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import glide.managers.CommandManager; -import glide.managers.ConnectionManager; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -352,15 +352,12 @@ public class RedisClientTest { RedisClient service; - ConnectionManager connectionManager; - CommandManager commandManager; @BeforeEach public void setUp() { - connectionManager = mock(ConnectionManager.class); commandManager = mock(CommandManager.class); - service = new RedisClient(connectionManager, commandManager); + service = new RedisClient(new BaseClient.ClientBuilder(null, commandManager, null, null)); } @SneakyThrows @@ -9067,6 +9064,30 @@ public void unwatch_returns_success() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void publish_returns_success() { + // setup + String channel = "channel"; + String message = "message"; + String[] arguments = new String[] {channel, message}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(Publish), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.publish(channel, message); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + @SneakyThrows @Test public void sunion_returns_success() { diff --git a/java/client/src/test/java/glide/api/RedisClusterClientTest.java b/java/client/src/test/java/glide/api/RedisClusterClientTest.java index 7a82ff5e35..c43dbd3f15 100644 --- a/java/client/src/test/java/glide/api/RedisClusterClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClusterClientTest.java @@ -51,6 +51,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Lolwut; import static redis_request.RedisRequestOuterClass.RequestType.Ping; import static redis_request.RedisRequestOuterClass.RequestType.RandomKey; +import static redis_request.RedisRequestOuterClass.RequestType.SPublish; import static redis_request.RedisRequestOuterClass.RequestType.Sort; import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly; import static redis_request.RedisRequestOuterClass.RequestType.Time; @@ -68,7 +69,6 @@ import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; import glide.managers.CommandManager; -import glide.managers.ConnectionManager; import glide.managers.RedisExceptionCheckedFunction; import java.util.EnumSet; import java.util.HashMap; @@ -86,17 +86,15 @@ public class RedisClusterClientTest { RedisClusterClient service; - ConnectionManager connectionManager; - CommandManager commandManager; private final String[] TEST_ARGS = new String[0]; @BeforeEach public void setUp() { - connectionManager = mock(ConnectionManager.class); commandManager = mock(CommandManager.class); - service = new RedisClusterClient(connectionManager, commandManager); + service = + new RedisClusterClient(new BaseClient.ClientBuilder(null, commandManager, null, null)); } @Test @@ -165,7 +163,7 @@ private static class TestClient extends RedisClusterClient { private final Object object; public TestClient(CommandManager commandManager, Object objectToReturn) { - super(null, commandManager); + super(new BaseClient.ClientBuilder(null, commandManager, null, null)); object = objectToReturn; } @@ -2092,6 +2090,30 @@ public void randomKey() { assertEquals(testResponse, response); } + @SneakyThrows + @Test + public void spublish_returns_success() { + // setup + String channel = "channel"; + String message = "message"; + String[] arguments = new String[] {channel, message}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(OK); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(SPublish), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.spublish(channel, message); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK, payload); + } + @SneakyThrows @Test public void sort_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java index c33a927792..0f109f9108 100644 --- a/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java +++ b/java/client/src/test/java/glide/api/models/ClusterTransactionTests.java @@ -7,6 +7,7 @@ import static glide.api.models.commands.SortBaseOptions.OrderBy.ASC; import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING; import static org.junit.jupiter.api.Assertions.assertEquals; +import static redis_request.RedisRequestOuterClass.RequestType.SPublish; import static redis_request.RedisRequestOuterClass.RequestType.Sort; import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly; @@ -14,24 +15,21 @@ import glide.api.models.commands.SortClusterOptions; import java.util.LinkedList; import java.util.List; -import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import redis_request.RedisRequestOuterClass; +import org.junit.jupiter.api.Test; +import redis_request.RedisRequestOuterClass.Command; +import redis_request.RedisRequestOuterClass.Command.ArgsArray; +import redis_request.RedisRequestOuterClass.RequestType; public class ClusterTransactionTests { - private static Stream getTransactionBuilders() { - return Stream.of( - Arguments.of(new ClusterTransaction()), Arguments.of(new ClusterTransaction())); - } - @ParameterizedTest - @MethodSource("getTransactionBuilders") - public void cluster_transaction_builds_protobuf_request(ClusterTransaction transaction) { - List> - results = new LinkedList<>(); + @Test + public void cluster_transaction_builds_protobuf_request() { + ClusterTransaction transaction = new ClusterTransaction(); + List> results = new LinkedList<>(); + + transaction.spublish("ch1", "msg"); + results.add(Pair.of(SPublish, buildArgs("ch1", "msg"))); transaction.sortReadOnly( "key1", @@ -83,7 +81,7 @@ public void cluster_transaction_builds_protobuf_request(ClusterTransaction trans var protobufTransaction = transaction.getProtobufTransaction().build(); for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) { - RedisRequestOuterClass.Command protobuf = protobufTransaction.getCommands(idx); + Command protobuf = protobufTransaction.getCommands(idx); assertEquals(results.get(idx).getLeft(), protobuf.getRequestType()); assertEquals( diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 9ff1d41d39..9c70ed1886 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -146,6 +146,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.PfCount; import static redis_request.RedisRequestOuterClass.RequestType.PfMerge; import static redis_request.RedisRequestOuterClass.RequestType.Ping; +import static redis_request.RedisRequestOuterClass.RequestType.Publish; import static redis_request.RedisRequestOuterClass.RequestType.RPop; import static redis_request.RedisRequestOuterClass.RequestType.RPush; import static redis_request.RedisRequestOuterClass.RequestType.RPushX; @@ -1159,6 +1160,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.lcsLen("key1", "key2"); results.add(Pair.of(LCS, buildArgs("key1", "key2", "LEN"))); + transaction.publish("ch1", "msg"); + results.add(Pair.of(Publish, buildArgs("ch1", "msg"))); + transaction.lcsIdx("key1", "key2"); results.add(Pair.of(LCS, buildArgs("key1", "key2", IDX_COMMAND_STRING))); diff --git a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java index 27e00ef52b..86af8e7c05 100644 --- a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java +++ b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java @@ -39,7 +39,9 @@ public class ConnectionWithGlideMockTests extends RustCoreLibMockTestBase { public void createTestClient() { channelHandler = new ChannelHandler( - new CallbackDispatcher(), socketPath, Platform.getThreadPoolResourceSupplier().get()); + new CallbackDispatcher(null), + socketPath, + Platform.getThreadPoolResourceSupplier().get()); } @AfterEach @@ -185,7 +187,12 @@ public void rethrow_error_if_UDS_channel_closed() { private static class TestClient extends RedisClient { public TestClient(ChannelHandler channelHandler) { - super(new ConnectionManager(channelHandler), new CommandManager(channelHandler)); + super( + new ClientBuilder( + new ConnectionManager(channelHandler), + new CommandManager(channelHandler), + null, + null)); } } } diff --git a/java/client/src/test/java/glide/managers/CommandManagerTest.java b/java/client/src/test/java/glide/managers/CommandManagerTest.java index 876851cfb1..8935fc461b 100644 --- a/java/client/src/test/java/glide/managers/CommandManagerTest.java +++ b/java/client/src/test/java/glide/managers/CommandManagerTest.java @@ -76,7 +76,7 @@ public void submitNewCommand_return_Object_result() { service.submitNewCommand( CustomCommand, new String[0], - new BaseCommandResponseResolver((ptr) -> ptr == pointer ? respObject : null)); + new BaseResponseResolver((ptr) -> ptr == pointer ? respObject : null)); Object respPointer = result.get(); // verify @@ -98,7 +98,9 @@ public void submitNewCommand_return_Null_result() { service.submitNewCommand( CustomCommand, new String[0], - new BaseCommandResponseResolver((p) -> new RuntimeException(""))); + new BaseResponseResolver( + (p) -> + new RuntimeException("Testing: something went wrong if you see this error"))); Object respPointer = result.get(); // verify @@ -125,7 +127,7 @@ public void submitNewCommand_return_String_result() { service.submitNewCommand( CustomCommand, new String[0], - new BaseCommandResponseResolver((p) -> p == pointer ? testString : null)); + new BaseResponseResolver((p) -> p == pointer ? testString : null)); Object respPointer = result.get(); // verify diff --git a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java index 792259799b..fb5dc86014 100644 --- a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java +++ b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java @@ -3,6 +3,8 @@ import static glide.api.models.configuration.NodeAddress.DEFAULT_HOST; import static glide.api.models.configuration.NodeAddress.DEFAULT_PORT; +import static glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode.EXACT; +import static glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode.PATTERN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -13,10 +15,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.protobuf.ByteString; import connection_request.ConnectionRequestOuterClass; import connection_request.ConnectionRequestOuterClass.AuthenticationInfo; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy; +import connection_request.ConnectionRequestOuterClass.PubSubChannelsOrPatterns; +import connection_request.ConnectionRequestOuterClass.PubSubSubscriptions; import connection_request.ConnectionRequestOuterClass.TlsMode; import glide.api.models.configuration.BackoffStrategy; import glide.api.models.configuration.NodeAddress; @@ -24,9 +29,11 @@ import glide.api.models.configuration.RedisClientConfiguration; import glide.api.models.configuration.RedisClusterClientConfiguration; import glide.api.models.configuration.RedisCredentials; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration; import glide.api.models.exceptions.ClosingException; import glide.connectors.handlers.ChannelHandler; import io.netty.channel.ChannelFuture; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.SneakyThrows; @@ -135,6 +142,12 @@ public void connection_request_protobuf_generation_with_all_fields_set() { .build()) .databaseId(DATABASE_ID) .clientName(CLIENT_NAME) + .subscriptionConfiguration( + StandaloneSubscriptionConfiguration.builder() + .subscription(EXACT, "channel_1") + .subscription(EXACT, "channel_2") + .subscription(PATTERN, "*chatRoom*") + .build()) .build(); ConnectionRequest expectedProtobufConnectionRequest = ConnectionRequest.newBuilder() @@ -162,6 +175,20 @@ public void connection_request_protobuf_generation_with_all_fields_set() { .build()) .setDatabaseId(DATABASE_ID) .setClientName(CLIENT_NAME) + .setPubsubSubscriptions( + PubSubSubscriptions.newBuilder() + .putAllChannelsOrPatternsByType( + Map.of( + EXACT.ordinal(), + PubSubChannelsOrPatterns.newBuilder() + .addChannelsOrPatterns(ByteString.copyFromUtf8("channel_1")) + .addChannelsOrPatterns(ByteString.copyFromUtf8("channel_2")) + .build(), + PATTERN.ordinal(), + PubSubChannelsOrPatterns.newBuilder() + .addChannelsOrPatterns(ByteString.copyFromUtf8("*chatRoom*")) + .build())) + .build()) .build(); CompletableFuture completedFuture = new CompletableFuture<>(); Response response = Response.newBuilder().setConstantResponse(ConstantResponse.OK).build(); diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java new file mode 100644 index 0000000000..7bdb8870c2 --- /dev/null +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -0,0 +1,893 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide; + +import static glide.TestConfiguration.REDIS_VERSION; +import static glide.TestUtilities.commonClientConfig; +import static glide.TestUtilities.commonClusterClientConfig; +import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assumptions.assumeFalse; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import glide.api.BaseClient; +import glide.api.RedisClient; +import glide.api.RedisClusterClient; +import glide.api.models.ClusterTransaction; +import glide.api.models.PubSubMessage; +import glide.api.models.Transaction; +import glide.api.models.configuration.BaseSubscriptionConfiguration.ChannelMode; +import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback; +import glide.api.models.configuration.ClusterSubscriptionConfiguration; +import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration; +import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode; +import glide.api.models.exceptions.ConfigurationError; +import glide.api.models.exceptions.RequestException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +@Timeout(30) // sec +public class PubSubTests { + + // TODO protocol version + @SneakyThrows + @SuppressWarnings("unchecked") + private BaseClient createClientWithSubscriptions( + boolean standalone, + Map> subscriptions, + Optional callback, + Optional context) { + if (standalone) { + var subConfigBuilder = + StandaloneSubscriptionConfiguration.builder() + .subscriptions((Map>) subscriptions); + + if (callback.isPresent()) { + subConfigBuilder.callback(callback.get(), context.get()); + } + return RedisClient.CreateClient( + commonClientConfig().subscriptionConfiguration(subConfigBuilder.build()).build()) + .get(); + } else { + var subConfigBuilder = + ClusterSubscriptionConfiguration.builder() + .subscriptions((Map>) subscriptions); + + if (callback.isPresent()) { + subConfigBuilder.callback(callback.get(), context.get()); + } + + return RedisClusterClient.CreateClient( + commonClusterClientConfig() + .subscriptionConfiguration(subConfigBuilder.build()) + .build()) + .get(); + } + } + + private BaseClient createClientWithSubscriptions( + boolean standalone, Map> subscriptions) { + return createClientWithSubscriptions( + standalone, subscriptions, Optional.empty(), Optional.empty()); + } + + @SneakyThrows + private BaseClient createClient(boolean standalone) { + if (standalone) { + return RedisClient.CreateClient(commonClientConfig().build()).get(); + } + return RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get(); + } + + /** + * pubsubMessage queue used in callback to analyze received pubsubMessages. Number is a client ID. + */ + private final ConcurrentLinkedDeque> pubsubMessageQueue = + new ConcurrentLinkedDeque<>(); + + /** Clients used in a test. */ + private final List clients = new ArrayList<>(); + + private static final int MESSAGE_DELIVERY_DELAY = 500; // ms + + @BeforeEach + @SneakyThrows + public void cleanup() { + for (var client : clients) { + if (client instanceof RedisClusterClient) { + ((RedisClusterClient) client).customCommand(new String[] {"unsubscribe"}, ALL_NODES).get(); + ((RedisClusterClient) client).customCommand(new String[] {"punsubscribe"}, ALL_NODES).get(); + ((RedisClusterClient) client).customCommand(new String[] {"sunsubscribe"}, ALL_NODES).get(); + } else { + ((RedisClient) client).customCommand(new String[] {"unsubscribe"}).get(); + ((RedisClient) client).customCommand(new String[] {"punsubscribe"}).get(); + } + client.close(); + } + clients.clear(); + pubsubMessageQueue.clear(); + } + + private void verifyReceivedPubsubMessages( + Set> pubsubMessages, BaseClient listener, boolean callback) { + if (callback) { + assertEquals(pubsubMessages, new HashSet<>(pubsubMessageQueue)); + } else { + var received = new HashSet(pubsubMessages.size()); + PubSubMessage pubsubMessage; + while ((pubsubMessage = listener.tryGetPubSubMessage()) != null) { + received.add(pubsubMessage); + } + assertEquals( + pubsubMessages.stream().map(Pair::getValue).collect(Collectors.toSet()), received); + } + } + + private static Stream getTwoBoolPermutations() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + + private ChannelMode exact(boolean standalone) { + return standalone ? PubSubChannelMode.EXACT : PubSubClusterChannelMode.EXACT; + } + + private ChannelMode pattern(boolean standalone) { + return standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN; + } + + @SuppressWarnings("unchecked") + private BaseClient createListener( + boolean standalone, + boolean useCallback, + int clientId, + Map> subscriptions) { + MessageCallback callback = + (msg, ctx) -> + ((ConcurrentLinkedDeque>) ctx) + .push(Pair.of(clientId, msg)); + return useCallback + ? createClientWithSubscriptions( + standalone, subscriptions, Optional.of(callback), Optional.of(pubsubMessageQueue)) + : createClientWithSubscriptions(standalone, subscriptions); + } + + // TODO add following tests from https://github.com/aws/glide-for-redis/pull/1643 + // test_pubsub_exact_happy_path_coexistence + // test_pubsub_exact_happy_path_many_channels_co_existence + // test_sharded_pubsub_co_existence + // test_pubsub_pattern_co_existence + // TODO tests below blocked by https://github.com/aws/glide-for-redis/issues/1649 + // test_pubsub_exact_max_size_PubsubMessage + // test_pubsub_sharded_max_size_PubsubMessage + // test_pubsub_exact_max_size_PubsubMessage_callback + // test_pubsub_sharded_max_size_PubsubMessage_callback + + // TODO why `publish` returns 0 on cluster or > 1 on standalone when there is only 1 receiver??? + // meanwhile, all pubsubMessages are delivered. + // debug this and add checks for `publish` return value + + // TODO: remove once fixed + private void skipTestsOnMac() { + assumeFalse( + System.getProperty("os.name").toLowerCase().contains("mac"), + "PubSub doesn't work on mac OS"); + } + + /** Similar to `test_pubsub_exact_happy_path` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void exact_happy_path(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + String channel = UUID.randomUUID().toString(); + String message = UUID.randomUUID().toString(); + var subscriptions = Map.of(exact(standalone), Set.of(channel)); + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + sender.publish(channel, message).get(); + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message + + verifyReceivedPubsubMessages( + Set.of(Pair.of(1, new PubSubMessage(message, channel))), listener, useCallback); + } + + /** Similar to `test_pubsub_exact_happy_path_many_channels` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void exact_happy_path_many_channels(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + int numChannels = 256; + int messagesPerChannel = 256; + var messages = new ArrayList(numChannels * messagesPerChannel); + ChannelMode mode = exact(standalone); + Map> subscriptions = Map.of(mode, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(mode).add(channel); + for (var j = 0; j < messagesPerChannel; j++) { + var message = i + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + } + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + verifyReceivedPubsubMessages( + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), + listener, + useCallback); + } + + /** Similar to `test_sharded_pubsub` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "use callback = {0}") + @ValueSource(booleans = {true, false}) + public void sharded_pubsub(boolean useCallback) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String channel = UUID.randomUUID().toString(); + String pubsubMessage = UUID.randomUUID().toString(); + var subscriptions = Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + + var listener = createListener(false, useCallback, 1, subscriptions); + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listener, sender)); + + sender.spublish(channel, pubsubMessage).get(); + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the message + + verifyReceivedPubsubMessages( + Set.of(Pair.of(1, new PubSubMessage(pubsubMessage, channel))), listener, useCallback); + } + + /** Similar to `test_sharded_pubsub_many_channels` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "use callback = {0}") + @ValueSource(booleans = {true, false}) + public void sharded_pubsub_many_channels(boolean useCallback) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + int numChannels = 256; + int pubsubMessagesPerChannel = 256; + var pubsubMessages = new ArrayList(numChannels * pubsubMessagesPerChannel); + PubSubClusterChannelMode mode = PubSubClusterChannelMode.SHARDED; + Map> subscriptions = Map.of(mode, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(mode).add(channel); + for (var j = 0; j < pubsubMessagesPerChannel; j++) { + var message = i + "-" + j + "-" + UUID.randomUUID(); + pubsubMessages.add(new PubSubMessage(message, channel)); + } + } + + var listener = createListener(false, useCallback, 1, subscriptions); + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : pubsubMessages) { + sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + sender.spublish(UUID.randomUUID().toString(), UUID.randomUUID().toString()).get(); + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + verifyReceivedPubsubMessages( + pubsubMessages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), + listener, + useCallback); + } + + /** Similar to `test_pubsub_pattern` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void pattern(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + String prefix = "channel."; + String pattern = prefix + "*"; + Map message2channels = + Map.of( + prefix + "1", UUID.randomUUID().toString(), prefix + "2", UUID.randomUUID().toString()); + var subscriptions = + Map.of( + standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN, + Set.of(pattern)); + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why? + + for (var entry : message2channels.entrySet()) { + sender.publish(entry.getKey(), entry.getValue()).get(); + } + sender.publish("channel", UUID.randomUUID().toString()).get(); + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + var expected = + message2channels.entrySet().stream() + .map(e -> Pair.of(1, new PubSubMessage(e.getValue(), e.getKey(), pattern))) + .collect(Collectors.toSet()); + + verifyReceivedPubsubMessages(expected, listener, useCallback); + } + + /** Similar to `test_pubsub_pattern_many_channels` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void pattern_many_channels(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + String prefix = "channel."; + String pattern = prefix + "*"; + int numChannels = 256; + int messagesPerChannel = 256; + ChannelMode mode = standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN; + var messages = new ArrayList(numChannels * messagesPerChannel); + var subscriptions = Map.of(mode, Set.of(pattern)); + + for (var i = 0; i < numChannels; i++) { + var channel = prefix + "-" + i + "-" + UUID.randomUUID(); + for (var j = 0; j < messagesPerChannel; j++) { + var message = i + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel, pattern)); + } + } + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // need some time to propagate subscriptions - why? + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + sender.publish("channel", UUID.randomUUID().toString()).get(); + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + verifyReceivedPubsubMessages( + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), + listener, + useCallback); + } + + /** Similar to `test_pubsub_combined_exact_and_pattern_one_client` in python client tests. */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void combined_exact_and_pattern_one_client(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + String prefix = "channel."; + String pattern = prefix + "*"; + int numChannels = 256; + int messagesPerChannel = 256; + var messages = new ArrayList(numChannels * messagesPerChannel); + ChannelMode mode = standalone ? PubSubChannelMode.EXACT : PubSubClusterChannelMode.EXACT; + Map> subscriptions = + Map.of( + mode, + new HashSet<>(), + standalone ? PubSubChannelMode.PATTERN : PubSubClusterChannelMode.PATTERN, + Set.of(pattern)); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(mode).add(channel); + for (var j = 0; j < messagesPerChannel; j++) { + var message = i + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + } + + for (var j = 0; j < messagesPerChannel; j++) { + var pubsubMessage = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(pubsubMessage, channel, pattern)); + } + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + verifyReceivedPubsubMessages( + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), + listener, + useCallback); + } + + /** + * Similar to `test_pubsub_combined_exact_and_pattern_multiple_clients` in python client tests. + */ + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void combined_exact_and_pattern_multiple_clients(boolean standalone, boolean useCallback) { + skipTestsOnMac(); + String prefix = "channel."; + String pattern = prefix + "*"; + int numChannels = 256; + var messages = new ArrayList(numChannels * 2); + ChannelMode mode = exact(standalone); + Map> subscriptions = Map.of(mode, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(mode).add(channel); + var message = i + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + + for (var j = 0; j < numChannels; j++) { + var message = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel, pattern)); + } + + var listenerExactSub = createListener(standalone, useCallback, 1, subscriptions); + + subscriptions = Map.of(pattern(standalone), Set.of(pattern)); + var listenerPatternSub = createListener(standalone, useCallback, 2, subscriptions); + + var sender = createClient(standalone); + clients.addAll(List.of(listenerExactSub, listenerPatternSub, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + if (useCallback) { + verifyReceivedPubsubMessages( + messages.stream() + .map(m -> Pair.of(m.getPattern().isEmpty() ? 1 : 2, m)) + .collect(Collectors.toSet()), + listenerExactSub, + useCallback); + } else { + verifyReceivedPubsubMessages( + messages.stream() + .filter(m -> m.getPattern().isEmpty()) + .map(m -> Pair.of(1, m)) + .collect(Collectors.toSet()), + listenerExactSub, + useCallback); + verifyReceivedPubsubMessages( + messages.stream() + .filter(m -> m.getPattern().isPresent()) + .map(m -> Pair.of(2, m)) + .collect(Collectors.toSet()), + listenerPatternSub, + useCallback); + } + } + + /** + * Similar to `test_pubsub_combined_exact_pattern_and_sharded_one_client` in python client tests. + */ + @SneakyThrows + @ParameterizedTest(name = "use callback = {0}") + @ValueSource(booleans = {true, false}) + public void combined_exact_pattern_and_sharded_one_client(boolean useCallback) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String prefix = "channel."; + String pattern = prefix + "*"; + String shardPrefix = "{shard}"; + int numChannels = 256; + var messages = new ArrayList(numChannels * 2); + var shardedMessages = new ArrayList(numChannels); + Map> subscriptions = + Map.of( + PubSubClusterChannelMode.EXACT, new HashSet<>(), + PubSubClusterChannelMode.PATTERN, Set.of(pattern), + PubSubClusterChannelMode.SHARDED, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.EXACT).add(channel); + var message = i + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel)); + } + + for (var i = 0; i < numChannels; i++) { + var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + subscriptions.get(PubSubClusterChannelMode.SHARDED).add(channel); + var message = i + "-" + UUID.randomUUID(); + shardedMessages.add(new PubSubMessage(message, channel)); + } + + for (var j = 0; j < numChannels; j++) { + var message = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + messages.add(new PubSubMessage(message, channel, pattern)); + } + + var listener = createListener(false, useCallback, 1, subscriptions); + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listener, sender)); + + for (var pubsubMessage : messages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + for (var pubsubMessage : shardedMessages) { + sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + messages.addAll(shardedMessages); + verifyReceivedPubsubMessages( + messages.stream().map(m -> Pair.of(1, m)).collect(Collectors.toSet()), + listener, + useCallback); + } + + /** + * Similar to `test_pubsub_combined_exact_pattern_and_sharded_multi_client` in python client + * tests. + */ + @SneakyThrows + @ParameterizedTest(name = "use callback = {0}") + @ValueSource(booleans = {true, false}) + public void combined_exact_pattern_and_sharded_multi_client(boolean useCallback) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String prefix = "channel."; + String pattern = prefix + "*"; + String shardPrefix = "{shard}"; + int numChannels = 256; + var exactMessages = new ArrayList(numChannels); + var patternMessages = new ArrayList(numChannels); + var shardedMessages = new ArrayList(numChannels); + Map> subscriptionsExact = + Map.of(PubSubClusterChannelMode.EXACT, new HashSet<>()); + Map> subscriptionsPattern = + Map.of(PubSubClusterChannelMode.PATTERN, Set.of(pattern)); + Map> subscriptionsSharded = + Map.of(PubSubClusterChannelMode.SHARDED, new HashSet<>()); + + for (var i = 0; i < numChannels; i++) { + var channel = i + "-" + UUID.randomUUID(); + subscriptionsExact.get(PubSubClusterChannelMode.EXACT).add(channel); + var pubsubMessage = i + "-" + UUID.randomUUID(); + exactMessages.add(new PubSubMessage(pubsubMessage, channel)); + } + + for (var i = 0; i < numChannels; i++) { + var channel = shardPrefix + "-" + i + "-" + UUID.randomUUID(); + subscriptionsSharded.get(PubSubClusterChannelMode.SHARDED).add(channel); + var message = i + "-" + UUID.randomUUID(); + shardedMessages.add(new PubSubMessage(message, channel)); + } + + for (var j = 0; j < numChannels; j++) { + var message = j + "-" + UUID.randomUUID(); + var channel = prefix + "-" + j + "-" + UUID.randomUUID(); + patternMessages.add(new PubSubMessage(message, channel, pattern)); + } + + var listenerExact = + createListener( + false, useCallback, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); + var listenerPattern = + createListener( + false, useCallback, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + var listenerSharded = + createListener( + false, useCallback, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); + + var sender = (RedisClusterClient) createClient(false); + clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded, sender)); + + for (var pubsubMessage : exactMessages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + for (var pubsubMessage : patternMessages) { + sender.publish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + for (var pubsubMessage : shardedMessages) { + sender.spublish(pubsubMessage.getChannel(), pubsubMessage.getMessage()).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + if (useCallback) { + var expected = new HashSet>(); + expected.addAll( + exactMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), m)) + .collect(Collectors.toSet())); + expected.addAll( + patternMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), m)) + .collect(Collectors.toSet())); + expected.addAll( + shardedMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) + .collect(Collectors.toSet())); + + verifyReceivedPubsubMessages(expected, listenerExact, useCallback); + } else { + verifyReceivedPubsubMessages( + exactMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), m)) + .collect(Collectors.toSet()), + listenerExact, + useCallback); + verifyReceivedPubsubMessages( + patternMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), m)) + .collect(Collectors.toSet()), + listenerPattern, + useCallback); + verifyReceivedPubsubMessages( + shardedMessages.stream() + .map(m -> Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), m)) + .collect(Collectors.toSet()), + listenerSharded, + useCallback); + } + } + + /** + * Similar to `test_pubsub_three_publishing_clients_same_name_with_sharded` in python client + * tests. + */ + @SneakyThrows + @Test + public void three_publishing_clients_same_name_with_sharded_no_callback() { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String channel = UUID.randomUUID().toString(); + var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); + var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); + var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); + Map> subscriptionsExact = + Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + Map> subscriptionsPattern = + Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel)); + Map> subscriptionsSharded = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + + var listenerExact = + (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsExact); + var listenerPattern = + (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsPattern); + var listenerSharded = + (RedisClusterClient) createClientWithSubscriptions(false, subscriptionsSharded); + clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded)); + + listenerPattern.publish(channel, exactMessage.getMessage()).get(); + listenerSharded.publish(channel, patternMessage.getMessage()).get(); + listenerExact.spublish(channel, shardedMessage.getMessage()).get(); + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + verifyReceivedPubsubMessages( + Set.of( + Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), + Pair.of( + PubSubClusterChannelMode.EXACT.ordinal(), + new PubSubMessage(patternMessage.getMessage(), channel))), + listenerExact, + false); + verifyReceivedPubsubMessages( + Set.of( + Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), + Pair.of( + PubSubClusterChannelMode.PATTERN.ordinal(), + new PubSubMessage(exactMessage.getMessage(), channel, channel))), + listenerPattern, + false); + verifyReceivedPubsubMessages( + Set.of(Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)), + listenerSharded, + false); + } + + /** + * Similar to `test_pubsub_three_publishing_clients_same_name_with_sharded` in python client + * tests. + */ + @SneakyThrows + @Test + public void three_publishing_clients_same_name_with_sharded_with_callback() { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + + String channel = UUID.randomUUID().toString(); + var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); + var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), channel, channel); + var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); + Map> subscriptionsExact = + Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel)); + Map> subscriptionsPattern = + Map.of(PubSubClusterChannelMode.PATTERN, Set.of(channel)); + Map> subscriptionsSharded = + Map.of(PubSubClusterChannelMode.SHARDED, Set.of(channel)); + + var listenerExact = + (RedisClusterClient) + createListener( + false, true, PubSubClusterChannelMode.EXACT.ordinal(), subscriptionsExact); + var listenerPattern = + createListener( + false, true, PubSubClusterChannelMode.PATTERN.ordinal(), subscriptionsPattern); + var listenerSharded = + createListener( + false, true, PubSubClusterChannelMode.SHARDED.ordinal(), subscriptionsSharded); + + clients.addAll(List.of(listenerExact, listenerPattern, listenerSharded)); + + listenerPattern.publish(channel, exactMessage.getMessage()).get(); + listenerSharded.publish(channel, patternMessage.getMessage()).get(); + listenerExact.spublish(channel, shardedMessage.getMessage()).get(); + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + var expected = + Set.of( + Pair.of(PubSubClusterChannelMode.EXACT.ordinal(), exactMessage), + Pair.of( + PubSubClusterChannelMode.EXACT.ordinal(), + new PubSubMessage(patternMessage.getMessage(), channel)), + Pair.of(PubSubClusterChannelMode.PATTERN.ordinal(), patternMessage), + Pair.of( + PubSubClusterChannelMode.PATTERN.ordinal(), + new PubSubMessage(exactMessage.getMessage(), channel, channel)), + Pair.of(PubSubClusterChannelMode.SHARDED.ordinal(), shardedMessage)); + + verifyReceivedPubsubMessages(expected, listenerExact, true); + } + + @SneakyThrows + @Test + public void error_cases() { + skipTestsOnMac(); + // client isn't configured with subscriptions + var client = createClient(true); + assertThrows(ConfigurationError.class, client::tryGetPubSubMessage); + client.close(); + + // client configured with callback and doesn't return pubsubMessages via API + MessageCallback callback = (msg, ctx) -> fail(); + client = + createClientWithSubscriptions( + true, Map.of(), Optional.of(callback), Optional.of(pubsubMessageQueue)); + assertThrows(ConfigurationError.class, client::tryGetPubSubMessage); + client.close(); + + // using sharded channels from different slots in a transaction causes a cross slot error + var clusterClient = (RedisClusterClient) createClient(false); + var transaction = + new ClusterTransaction() + .spublish("abc", "one") + .spublish("mnk", "two") + .spublish("xyz", "three"); + var exception = + assertThrows(ExecutionException.class, () -> clusterClient.exec(transaction).get()); + assertInstanceOf(RequestException.class, exception.getCause()); + assertTrue(exception.getMessage().toLowerCase().contains("crossslot")); + + // TODO test when callback throws an exception - currently nothing happens now + // it should terminate the client + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}, use callback = {1}") + @MethodSource("getTwoBoolPermutations") + public void transaction_with_all_types_of_PubsubMessages( + boolean standalone, boolean useCallback) { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + skipTestsOnMac(); + assumeTrue( + standalone, // TODO activate tests after fix + "Test doesn't work on cluster due to Cross Slot error, probably a bug in `redis-rs`"); + + String prefix = "channel"; + String pattern = prefix + "*"; + String shardPrefix = "{shard}"; + String channel = UUID.randomUUID().toString(); + var exactMessage = new PubSubMessage(UUID.randomUUID().toString(), channel); + var patternMessage = new PubSubMessage(UUID.randomUUID().toString(), prefix, pattern); + var shardedMessage = new PubSubMessage(UUID.randomUUID().toString(), shardPrefix); + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.EXACT, + Set.of(channel), + PubSubChannelMode.PATTERN, + Set.of(pattern)) + : Map.of( + PubSubClusterChannelMode.EXACT, + Set.of(channel), + PubSubClusterChannelMode.PATTERN, + Set.of(pattern), + PubSubClusterChannelMode.SHARDED, + Set.of(shardPrefix)); + + var listener = createListener(standalone, useCallback, 1, subscriptions); + var sender = createClient(standalone); + clients.addAll(List.of(listener, sender)); + + if (standalone) { + var transaction = + new Transaction() + .publish(exactMessage.getChannel(), exactMessage.getMessage()) + .publish(patternMessage.getChannel(), patternMessage.getMessage()); + ((RedisClient) sender).exec(transaction).get(); + } else { + var transaction = + new ClusterTransaction() + .spublish(shardedMessage.getChannel(), shardedMessage.getMessage()) + .publish(exactMessage.getChannel(), exactMessage.getMessage()) + .publish(patternMessage.getChannel(), patternMessage.getMessage()); + ((RedisClusterClient) sender).exec(transaction).get(); + } + + Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages + + var expected = + standalone + ? Set.of(Pair.of(1, exactMessage), Pair.of(1, patternMessage)) + : Set.of( + Pair.of(1, exactMessage), Pair.of(1, patternMessage), Pair.of(1, shardedMessage)); + verifyReceivedPubsubMessages(expected, listener, useCallback); + } +} diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 59125da25f..b55c127247 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -97,7 +97,9 @@ public static Stream getCommonTransactionBuilders() { "Geospatial Commands", (TransactionBuilder) TransactionTestUtilities::geospatialCommands), Arguments.of( - "Bitmap Commands", (TransactionBuilder) TransactionTestUtilities::bitmapCommands)); + "Bitmap Commands", (TransactionBuilder) TransactionTestUtilities::bitmapCommands), + Arguments.of( + "PubSub Commands", (TransactionBuilder) TransactionTestUtilities::pubsubCommands)); } /** Generate test samples for parametrized tests. Could be routed to primary nodes only. */ @@ -1183,4 +1185,12 @@ private static Object[] bitmapCommands(BaseTransaction transaction) { } return expectedResults; } + + private static Object[] pubsubCommands(BaseTransaction transaction) { + transaction.publish("Tchannel", "message"); + + return new Object[] { + 0L, // publish("Tchannel", "message") + }; + } } diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java index 4f24b6b327..f65d06ccbc 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java @@ -273,6 +273,15 @@ public void unwatch() { assertEquals(foobarString, clusterClient.get(key2).get()); } + @Test + @SneakyThrows + public void spublish() { + assumeTrue(REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in redis 7"); + ClusterTransaction transaction = new ClusterTransaction().spublish("Schannel", "message"); + + assertArrayEquals(new Object[] {0L}, clusterClient.exec(transaction).get()); + } + @Test @SneakyThrows public void sort() { diff --git a/java/src/lib.rs b/java/src/lib.rs index 8bb054d888..1b1cc7a1d1 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -39,17 +39,7 @@ fn redis_value_to_java<'local>( Ok(JObject::from(env.byte_array_from_slice(&data)?)) } } - Value::Array(array) => { - let items: JObjectArray = - env.new_object_array(array.len() as i32, "java/lang/Object", JObject::null())?; - - for (i, item) in array.into_iter().enumerate() { - let java_value = redis_value_to_java(env, item, encoding_utf8)?; - env.set_object_array_element(&items, i as i32, java_value)?; - } - - Ok(items.into()) - } + Value::Array(array) => array_to_java_array(env, array, encoding_utf8), Value::Map(map) => { let linked_hash_map = env.new_object("java/util/LinkedHashMap", "()V", &[])?; @@ -89,10 +79,59 @@ fn redis_value_to_java<'local>( data: _, attributes: _, } => todo!(), - Value::Push { kind: _, data: _ } => todo!(), + // Create a java `Map` with two keys: + // - "kind" which corresponds to the push type, stored as a `String` + // - "values" which corresponds to the array of values received, stored as `Object[]` + // Only string messages are supported now by Redis and `redis-rs`. + Value::Push { kind, data } => { + let hash_map = env.new_object("java/util/HashMap", "()V", &[])?; + + let kind_str = env.new_string("kind")?; + let kind_value_str = env.new_string(format!("{kind:?}"))?; + + let _ = env.call_method( + &hash_map, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[(&kind_str).into(), (&kind_value_str).into()], + )?; + + let values_str = env.new_string("values")?; + let values = array_to_java_array(env, data, encoding_utf8)?; + + let _ = env.call_method( + &hash_map, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[(&values_str).into(), (&values).into()], + )?; + + Ok(hash_map) + } } } +/// Convert an array of values into java array of corresponding values. +/// +/// Recursively calls to [`redis_value_to_java`] for every element. +/// +/// Returns an arbitrary java `Object[]`. +fn array_to_java_array<'local>( + env: &mut JNIEnv<'local>, + values: Vec, + encoding_utf8: bool, +) -> Result, FFIError> { + let items: JObjectArray = + env.new_object_array(values.len() as i32, "java/lang/Object", JObject::null())?; + + for (i, item) in values.into_iter().enumerate() { + let java_value = redis_value_to_java(env, item, encoding_utf8)?; + env.set_object_array_element(&items, i as i32, java_value)?; + } + + Ok(items.into()) +} + #[no_mangle] pub extern "system" fn Java_glide_ffi_resolvers_RedisValueResolver_valueFromPointer<'local>( mut env: JNIEnv<'local>,