Skip to content

Commit

Permalink
Change from using a Mono to a Supplier (#3169)
Browse files Browse the repository at this point in the history
  • Loading branch information
tishun authored Feb 8, 2025
1 parent c420392 commit d33237b
Show file tree
Hide file tree
Showing 19 changed files with 61 additions and 50 deletions.
9 changes: 5 additions & 4 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static io.lettuce.core.ClientOptions.DEFAULT_JSON_PARSER;
import static io.lettuce.core.protocol.CommandType.EXEC;
Expand Down Expand Up @@ -87,7 +87,7 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC

private final RedisJsonCommandBuilder<K, V> jsonCommandBuilder;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

/**
* Initialize a new instance.
Expand All @@ -96,7 +96,8 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC
* @param codec the codec for command encoding
* @param parser the implementation of the {@link JsonParser} to use
*/
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
this.parser = parser;
this.connection = connection;
this.commandBuilder = new RedisCommandBuilder<>(codec);
Expand Down Expand Up @@ -3396,7 +3397,7 @@ public RedisFuture<List<Map<String, Object>>> clusterLinks() {

@Override
public JsonParser getJsonParser() {
return this.parser.block();
return this.parser.get();
}

private byte[] encodeFunction(String functionCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract class AbstractRedisReactiveCommands<K, V>

private final RedisJsonCommandBuilder<K, V> jsonCommandBuilder;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

private final ClientResources clientResources;

Expand All @@ -112,7 +112,8 @@ public abstract class AbstractRedisReactiveCommands<K, V>
* @param codec the codec for command encoding.
* @param parser the implementation of the {@link JsonParser} to use
*/
public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
this.connection = connection;
this.parser = parser;
this.commandBuilder = new RedisCommandBuilder<>(codec);
Expand Down Expand Up @@ -149,7 +150,7 @@ private EventExecutorGroup getScheduler() {

@Override
public JsonParser getJsonParser() {
return parser.block();
return parser.get();
}

@Override
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.function.Supplier;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
Expand Down Expand Up @@ -71,15 +72,15 @@ public class ClientOptions implements Serializable {

public static final SocketOptions DEFAULT_SOCKET_OPTIONS = SocketOptions.create();

public static final Mono<JsonParser> DEFAULT_JSON_PARSER = Mono.defer(() -> Mono.fromCallable(() -> {
public static final Supplier<JsonParser> DEFAULT_JSON_PARSER = () -> {
try {
Iterator<JsonParser> services = ServiceLoader.load(JsonParser.class).iterator();
return services.hasNext() ? services.next() : null;
} catch (ServiceConfigurationError e) {
throw new RedisJsonException("Could not load JsonParser, please consult the guide"
+ "at https://redis.github.io/lettuce/user-guide/redis-json/", e);
}
}));
};

public static final SslOptions DEFAULT_SSL_OPTIONS = SslOptions.create();

Expand Down Expand Up @@ -111,7 +112,7 @@ public class ClientOptions implements Serializable {

private final Charset scriptCharset;

private final Mono<JsonParser> jsonParser;
private final Supplier<JsonParser> jsonParser;

private final SocketOptions socketOptions;

Expand Down Expand Up @@ -216,7 +217,7 @@ public static class Builder {

private Charset scriptCharset = DEFAULT_SCRIPT_CHARSET;

private Mono<JsonParser> jsonParser = DEFAULT_JSON_PARSER;
private Supplier<JsonParser> jsonParser = DEFAULT_JSON_PARSER;

private SocketOptions socketOptions = DEFAULT_SOCKET_OPTIONS;

Expand Down Expand Up @@ -429,7 +430,7 @@ public Builder scriptCharset(Charset scriptCharset) {
* @see JsonParser
* @since 6.5
*/
public Builder jsonParser(Mono<JsonParser> parser) {
public Builder jsonParser(Supplier<JsonParser> parser) {

LettuceAssert.notNull(parser, "JsonParser must not be null");
this.jsonParser = parser;
Expand Down Expand Up @@ -705,7 +706,7 @@ public Charset getScriptCharset() {
* @return the implementation of the {@link JsonParser} to use.
* @since 6.5
*/
public Mono<JsonParser> getJsonParser() {
public Supplier<JsonParser> getJsonParser() {
return jsonParser;
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/lettuce/core/RedisAsyncCommandsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.json.JsonParser;
import reactor.core.publisher.Mono;
import java.util.function.Supplier;

/**
* An asynchronous and thread-safe API for a Redis connection.
Expand All @@ -24,7 +24,8 @@ public class RedisAsyncCommandsImpl<K, V> extends AbstractRedisAsyncCommands<K,
* @param codec the codec for command encoding
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
super(connection, codec, parser);
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/lettuce/core/RedisJsonCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.function.Supplier;

import static io.lettuce.core.protocol.CommandType.*;

Expand All @@ -34,9 +34,9 @@
*/
class RedisJsonCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

RedisJsonCommandBuilder(RedisCodec<K, V> codec, Mono<JsonParser> theParser) {
RedisJsonCommandBuilder(RedisCodec<K, V> codec, Supplier<JsonParser> theParser) {
super(codec);
parser = theParser;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ Command<K, V, List<JsonValue>> jsonArrpop(K key, JsonPath jsonPath, int index) {
}
}

return createCommand(JSON_ARRPOP, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_ARRPOP, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, List<Long>> jsonArrtrim(K key, JsonPath jsonPath, JsonRangeArgs range) {
Expand Down Expand Up @@ -167,7 +167,7 @@ Command<K, V, List<JsonValue>> jsonGet(K key, JsonGetArgs options, JsonPath... j
}
}

return createCommand(JSON_GET, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_GET, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, String> jsonMerge(K key, JsonPath jsonPath, JsonValue value) {
Expand All @@ -194,7 +194,7 @@ Command<K, V, List<JsonValue>> jsonMGet(JsonPath jsonPath, K... keys) {
args.add(jsonPath.toString());
}

return createCommand(JSON_MGET, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_MGET, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/lettuce/core/RedisReactiveCommandsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.lettuce.core.json.JsonParser;
import reactor.core.publisher.Mono;

import java.util.function.Supplier;

/**
* A reactive and thread-safe API for a Redis Sentinel connection.
*
Expand All @@ -25,7 +27,7 @@ public class RedisReactiveCommandsImpl<K, V> extends AbstractRedisReactiveComman
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisReactiveCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.api.StatefulRedisConnection;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>

private final PushHandler pushHandler;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

protected MultiOutput<K, V> multi;

Expand Down Expand Up @@ -96,7 +97,7 @@ public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHa
* @param parser the parser to use for JSON commands.
*/
public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout, Mono<JsonParser> parser) {
Duration timeout, Supplier<JsonParser> parser) {

super(writer, timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.*;
Expand Down Expand Up @@ -64,7 +64,6 @@
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ConnectionIntent;
import reactor.core.publisher.Mono;

/**
* An advanced asynchronous and thread-safe API for a Redis Cluster connection.
Expand All @@ -89,11 +88,11 @@ public class RedisAdvancedClusterAsyncCommandsImpl<K, V> extends AbstractRedisAs
* @param codec Codec used to encode/decode keys and values.
* @param parser the implementation of the {@link JsonParser} to use
* @deprecated since 5.1, use
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand All @@ -104,7 +103,7 @@ public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<
* @param connection the stateful connection
* @param codec Codec used to encode/decode keys and values.
* @deprecated since 5.1, use
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
Expand All @@ -120,7 +119,7 @@ public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.json.JsonParser;
Expand Down Expand Up @@ -78,11 +79,11 @@ public class RedisAdvancedClusterReactiveCommandsImpl<K, V> extends AbstractRedi
* @param codec Codec used to encode/decode keys and values.
* @param parser the implementation of the {@link JsonParser} to use
* @deprecated since 5.2, use
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand All @@ -93,7 +94,7 @@ public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionIm
* @param connection the stateful connection.
* @param codec Codec used to encode/decode keys and values.
* @deprecated since 5.2, use
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection,
Expand All @@ -110,7 +111,7 @@ public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionIm
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
* @return new instance of StatefulRedisConnectionImpl
*/
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout, Mono<JsonParser> parser) {
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout, Supplier<JsonParser> parser) {
return new StatefulRedisConnectionImpl<>(channelWriter, pushHandler, codec, timeout, parser);
}

Expand Down Expand Up @@ -734,7 +734,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
*/
protected <V, K> StatefulRedisClusterConnectionImpl<K, V> newStatefulRedisClusterConnection(
RedisChannelWriter channelWriter, ClusterPushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
return new StatefulRedisClusterConnectionImpl(channelWriter, pushHandler, codec, timeout, parser);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.AbstractRedisClient;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandle

protected final RedisCodec<K, V> codec;

protected final Mono<JsonParser> parser;
protected final Supplier<JsonParser> parser;

protected final RedisAdvancedClusterCommands<K, V> sync;

Expand Down Expand Up @@ -113,7 +114,7 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush
* @param parser the JSON parser
*/
public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout, Mono<JsonParser> parser) {
Duration timeout, Supplier<JsonParser> parser) {

super(writer, timeout);
this.pushHandler = pushHandler;
Expand Down
Loading

0 comments on commit d33237b

Please sign in to comment.