Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report block error when use with reactor mode #3168 (6.5.x) #3170

Merged
merged 1 commit into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static io.lettuce.core.ClientOptions.DEFAULT_JSON_PARSER;
import static io.lettuce.core.protocol.CommandType.EXEC;
Expand Down Expand Up @@ -87,7 +87,7 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC

private final RedisJsonCommandBuilder<K, V> jsonCommandBuilder;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

/**
* Initialize a new instance.
Expand All @@ -96,7 +96,8 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC
* @param codec the codec for command encoding
* @param parser the implementation of the {@link JsonParser} to use
*/
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
this.parser = parser;
this.connection = connection;
this.commandBuilder = new RedisCommandBuilder<>(codec);
Expand Down Expand Up @@ -3391,7 +3392,7 @@ public RedisFuture<List<Map<String, Object>>> clusterLinks() {

@Override
public JsonParser getJsonParser() {
return this.parser.block();
return this.parser.get();
}

private byte[] encodeFunction(String functionCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract class AbstractRedisReactiveCommands<K, V>

private final RedisJsonCommandBuilder<K, V> jsonCommandBuilder;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

private final ClientResources clientResources;

Expand All @@ -112,7 +112,8 @@ public abstract class AbstractRedisReactiveCommands<K, V>
* @param codec the codec for command encoding.
* @param parser the implementation of the {@link JsonParser} to use
*/
public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
this.connection = connection;
this.parser = parser;
this.commandBuilder = new RedisCommandBuilder<>(codec);
Expand Down Expand Up @@ -149,7 +150,7 @@ private EventExecutorGroup getScheduler() {

@Override
public JsonParser getJsonParser() {
return parser.block();
return parser.get();
}

@Override
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.function.Supplier;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
Expand Down Expand Up @@ -69,15 +70,15 @@ public class ClientOptions implements Serializable {

public static final SocketOptions DEFAULT_SOCKET_OPTIONS = SocketOptions.create();

public static final Mono<JsonParser> DEFAULT_JSON_PARSER = Mono.defer(() -> Mono.fromCallable(() -> {
public static final Supplier<JsonParser> DEFAULT_JSON_PARSER = () -> {
try {
Iterator<JsonParser> services = ServiceLoader.load(JsonParser.class).iterator();
return services.hasNext() ? services.next() : null;
} catch (ServiceConfigurationError e) {
throw new RedisJsonException("Could not load JsonParser, please consult the guide"
+ "at https://redis.github.io/lettuce/user-guide/redis-json/", e);
}
}));
};

public static final SslOptions DEFAULT_SSL_OPTIONS = SslOptions.create();

Expand Down Expand Up @@ -105,7 +106,7 @@ public class ClientOptions implements Serializable {

private final Charset scriptCharset;

private final Mono<JsonParser> jsonParser;
private final Supplier<JsonParser> jsonParser;

private final SocketOptions socketOptions;

Expand Down Expand Up @@ -204,7 +205,7 @@ public static class Builder {

private Charset scriptCharset = DEFAULT_SCRIPT_CHARSET;

private Mono<JsonParser> jsonParser = DEFAULT_JSON_PARSER;
private Supplier<JsonParser> jsonParser = DEFAULT_JSON_PARSER;

private SocketOptions socketOptions = DEFAULT_SOCKET_OPTIONS;

Expand Down Expand Up @@ -399,7 +400,7 @@ public Builder scriptCharset(Charset scriptCharset) {
* @see JsonParser
* @since 6.5
*/
public Builder jsonParser(Mono<JsonParser> parser) {
public Builder jsonParser(Supplier<JsonParser> parser) {

LettuceAssert.notNull(parser, "JsonParser must not be null");
this.jsonParser = parser;
Expand Down Expand Up @@ -652,7 +653,7 @@ public Charset getScriptCharset() {
* @return the implementation of the {@link JsonParser} to use.
* @since 6.5
*/
public Mono<JsonParser> getJsonParser() {
public Supplier<JsonParser> getJsonParser() {
return jsonParser;
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/lettuce/core/RedisAsyncCommandsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.json.JsonParser;
import reactor.core.publisher.Mono;
import java.util.function.Supplier;

/**
* An asynchronous and thread-safe API for a Redis connection.
Expand All @@ -24,7 +24,8 @@ public class RedisAsyncCommandsImpl<K, V> extends AbstractRedisAsyncCommands<K,
* @param codec the codec for command encoding
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec, Mono<JsonParser> parser) {
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec,
Supplier<JsonParser> parser) {
super(connection, codec, parser);
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/lettuce/core/RedisJsonCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.function.Supplier;

import static io.lettuce.core.protocol.CommandType.*;

Expand All @@ -34,9 +34,9 @@
*/
class RedisJsonCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

RedisJsonCommandBuilder(RedisCodec<K, V> codec, Mono<JsonParser> theParser) {
RedisJsonCommandBuilder(RedisCodec<K, V> codec, Supplier<JsonParser> theParser) {
super(codec);
parser = theParser;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ Command<K, V, List<JsonValue>> jsonArrpop(K key, JsonPath jsonPath, int index) {
}
}

return createCommand(JSON_ARRPOP, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_ARRPOP, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, List<Long>> jsonArrtrim(K key, JsonPath jsonPath, JsonRangeArgs range) {
Expand Down Expand Up @@ -167,7 +167,7 @@ Command<K, V, List<JsonValue>> jsonGet(K key, JsonGetArgs options, JsonPath... j
}
}

return createCommand(JSON_GET, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_GET, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, String> jsonMerge(K key, JsonPath jsonPath, JsonValue value) {
Expand All @@ -194,7 +194,7 @@ Command<K, V, List<JsonValue>> jsonMGet(JsonPath jsonPath, K... keys) {
args.add(jsonPath.toString());
}

return createCommand(JSON_MGET, new JsonValueListOutput<>(codec, parser.block()), args);
return createCommand(JSON_MGET, new JsonValueListOutput<>(codec, parser.get()), args);
}

Command<K, V, String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/lettuce/core/RedisReactiveCommandsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.lettuce.core.json.JsonParser;
import reactor.core.publisher.Mono;

import java.util.function.Supplier;

/**
* A reactive and thread-safe API for a Redis Sentinel connection.
*
Expand All @@ -25,7 +27,7 @@ public class RedisReactiveCommandsImpl<K, V> extends AbstractRedisReactiveComman
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisReactiveCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.api.StatefulRedisConnection;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>

private final PushHandler pushHandler;

private final Mono<JsonParser> parser;
private final Supplier<JsonParser> parser;

protected MultiOutput<K, V> multi;

Expand All @@ -94,7 +95,7 @@ public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHa
* @param parser the parser to use for JSON commands.
*/
public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout, Mono<JsonParser> parser) {
Duration timeout, Supplier<JsonParser> parser) {

super(writer, timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.*;
Expand Down Expand Up @@ -64,7 +64,6 @@
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ConnectionIntent;
import reactor.core.publisher.Mono;

/**
* An advanced asynchronous and thread-safe API for a Redis Cluster connection.
Expand All @@ -89,11 +88,11 @@ public class RedisAdvancedClusterAsyncCommandsImpl<K, V> extends AbstractRedisAs
* @param codec Codec used to encode/decode keys and values.
* @param parser the implementation of the {@link JsonParser} to use
* @deprecated since 5.1, use
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand All @@ -104,7 +103,7 @@ public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<
* @param connection the stateful connection
* @param codec Codec used to encode/decode keys and values.
* @deprecated since 5.1, use
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
Expand All @@ -120,7 +119,7 @@ public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.json.JsonParser;
Expand Down Expand Up @@ -78,11 +79,11 @@ public class RedisAdvancedClusterReactiveCommandsImpl<K, V> extends AbstractRedi
* @param codec Codec used to encode/decode keys and values.
* @param parser the implementation of the {@link JsonParser} to use
* @deprecated since 5.2, use
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand All @@ -93,7 +94,7 @@ public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionIm
* @param connection the stateful connection.
* @param codec Codec used to encode/decode keys and values.
* @deprecated since 5.2, use
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Mono)}.
* {@link #RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection, RedisCodec, Supplier)}.
*/
@Deprecated
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection,
Expand All @@ -110,7 +111,7 @@ public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionIm
* @param parser the implementation of the {@link JsonParser} to use
*/
public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
super(connection, codec, parser);
this.codec = codec;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
* @return new instance of StatefulRedisConnectionImpl
*/
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout, Mono<JsonParser> parser) {
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout, Supplier<JsonParser> parser) {
return new StatefulRedisConnectionImpl<>(channelWriter, pushHandler, codec, timeout, parser);
}

Expand Down Expand Up @@ -726,7 +726,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
*/
protected <V, K> StatefulRedisClusterConnectionImpl<K, V> newStatefulRedisClusterConnection(
RedisChannelWriter channelWriter, ClusterPushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout,
Mono<JsonParser> parser) {
Supplier<JsonParser> parser) {
return new StatefulRedisClusterConnectionImpl(channelWriter, pushHandler, codec, timeout, parser);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.lettuce.core.AbstractRedisClient;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandle

protected final RedisCodec<K, V> codec;

protected final Mono<JsonParser> parser;
protected final Supplier<JsonParser> parser;

protected final RedisAdvancedClusterCommands<K, V> sync;

Expand Down Expand Up @@ -113,7 +114,7 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush
* @param parser the JSON parser
*/
public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPushHandler pushHandler, RedisCodec<K, V> codec,
Duration timeout, Mono<JsonParser> parser) {
Duration timeout, Supplier<JsonParser> parser) {

super(writer, timeout);
this.pushHandler = pushHandler;
Expand Down
Loading