diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 04cbf1d4c3..caa34ce70d 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -197,6 +197,7 @@ With this release, we took the opportunity to introduce a series of changes that * Remove Spring support classes #1358 * Replace io.lettuce.core.resource.Futures utility with Netty's PromiseCombiner #1283 * XGROUP DELCONSUMER should return pending message count #1377 (xgroupDelconsumer(…) now returns `Long`) +* Change hgetall return type from Mono to Flux #1434 * Script Commands: `eval`, `digest`, `scriptLoad` methods now only accept `String` and `byte[]` argument types. Previously `digest` and `scriptLoad` accepted the script contents as Codec value type which caused issues especially when marshalling values using JSON or Java Serialization. The script charset can be configured via `ClientOptions` (`ClientOptions.builder().scriptCharset(StandardCharsets.US_ASCII).build();`), defaulting to UTF-8. * Connection: Removal of deprecated timeout methods accepting `TimeUnit`. Use methods accepting `Duration` instead. * Async Commands: `RedisAsyncCommands.select(…)` and `.auth(…)` methods return now futures instead if being blocking methods. diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 5764274276..ea1d3083f8 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -785,8 +785,8 @@ public Mono hget(K key, K field) { } @Override - public Mono> hgetall(K key) { - return createMono(() -> commandBuilder.hgetall(key)); + public Flux> hgetall(K key) { + return createDissolvingFlux(() -> commandBuilder.hgetallKeyValue(key)); } @Override diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 34a5cba890..47be9d9002 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -939,6 +939,12 @@ Command> hgetall(K key) { return createCommand(HGETALL, new MapOutput<>(codec), key); } + Command>> hgetallKeyValue(K key) { + notNullKey(key); + + return createCommand(HGETALL, new KeyValueListOutput<>(codec), key); + } + Command hgetall(KeyValueStreamingChannel channel, K key) { notNullKey(key); notNull(channel); diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisHashReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisHashReactiveCommands.java index 35bf42ce26..64e69b6442 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisHashReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisHashReactiveCommands.java @@ -94,7 +94,7 @@ public interface RedisHashReactiveCommands { * @return Map<K,V> array-reply list of fields and their values stored in the hash, or an empty list when {@code key} * does not exist. */ - Mono> hgetall(K key); + Flux> hgetall(K key); /** * Stream over all the fields and values in a hash. diff --git a/src/main/java/io/lettuce/core/output/KeyValueListOutput.java b/src/main/java/io/lettuce/core/output/KeyValueListOutput.java index 01588793f7..bf105759ca 100644 --- a/src/main/java/io/lettuce/core/output/KeyValueListOutput.java +++ b/src/main/java/io/lettuce/core/output/KeyValueListOutput.java @@ -25,7 +25,8 @@ import io.lettuce.core.internal.LettuceAssert; /** - * {@link List} of values output. + * {@link List} of {@link KeyValue} output. Can be either used to decode key-value tuples (e.g. {@code HGETALL}) of for a pure + * value response where keys are supplied as input (for e.g. {@code HMGET}). * * @param Key type. * @param Value type. @@ -39,10 +40,18 @@ public class KeyValueListOutput extends CommandOutput> subscriber; - private Iterable keys; + private final Iterable keys; private Iterator keyIterator; + private K key; + + public KeyValueListOutput(RedisCodec codec) { + super(codec, Collections.emptyList()); + setSubscriber(ListSubscriber.instance()); + this.keys = null; + } + public KeyValueListOutput(RedisCodec codec, Iterable keys) { super(codec, Collections.emptyList()); setSubscriber(ListSubscriber.instance()); @@ -52,18 +61,31 @@ public KeyValueListOutput(RedisCodec codec, Iterable keys) { @Override public void set(ByteBuffer bytes) { - if (keyIterator == null) { - keyIterator = keys.iterator(); - } + if (keys == null) { + if (key == null) { + key = codec.decodeKey(bytes); + return; + } + + K key = this.key; + this.key = null; + subscriber.onNext(output, KeyValue.fromNullable(key, bytes == null ? null : codec.decodeValue(bytes))); - subscriber.onNext(output, KeyValue.fromNullable(keyIterator.next(), bytes == null ? null : codec.decodeValue(bytes))); + } else { + if (keyIterator == null) { + keyIterator = keys.iterator(); + } + + subscriber.onNext(output, + KeyValue.fromNullable(keyIterator.next(), bytes == null ? null : codec.decodeValue(bytes))); + } } @Override public void multi(int count) { if (!initialized) { - output = OutputFactory.newList(count); + output = OutputFactory.newList(keys == null ? count / 2 : count); initialized = true; } } diff --git a/src/test/java/io/lettuce/core/cluster/commands/reactive/HashClusterReactiveCommandIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/commands/reactive/HashClusterReactiveCommandIntegrationTests.java index d933341e59..36d0acbb21 100644 --- a/src/test/java/io/lettuce/core/cluster/commands/reactive/HashClusterReactiveCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/commands/reactive/HashClusterReactiveCommandIntegrationTests.java @@ -17,6 +17,9 @@ import javax.inject.Inject; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.commands.HashCommandIntegrationTests; import io.lettuce.test.ReactiveSyncInvocationHandler; @@ -30,4 +33,16 @@ class HashClusterReactiveCommandIntegrationTests extends HashCommandIntegrationT HashClusterReactiveCommandIntegrationTests(StatefulRedisClusterConnection connection) { super(ReactiveSyncInvocationHandler.sync(connection)); } + + @Test + @Disabled("API differences") + public void hgetall() { + + } + + @Test + @Disabled("API differences") + public void hgetallStreaming() { + + } } diff --git a/src/test/java/io/lettuce/core/commands/HashCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/HashCommandIntegrationTests.java index 43e2e02b9c..66d6a43447 100644 --- a/src/test/java/io/lettuce/core/commands/HashCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/HashCommandIntegrationTests.java @@ -88,7 +88,7 @@ void hget() { } @Test - void hgetall() { + public void hgetall() { assertThat(redis.hgetall(key).isEmpty()).isTrue(); redis.hset(key, "zero", "0"); @@ -102,7 +102,7 @@ void hgetall() { } @Test - void hgetallStreaming() { + public void hgetallStreaming() { KeyValueStreamingAdapter adapter = new KeyValueStreamingAdapter<>(); diff --git a/src/test/java/io/lettuce/core/commands/reactive/HashReactiveCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/reactive/HashReactiveCommandIntegrationTests.java index 4c81cea008..35d4db8706 100644 --- a/src/test/java/io/lettuce/core/commands/reactive/HashReactiveCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/reactive/HashReactiveCommandIntegrationTests.java @@ -15,8 +15,18 @@ */ package io.lettuce.core.commands.reactive; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.stream.Collectors; + import javax.inject.Inject; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import reactor.test.StepVerifier; +import io.lettuce.core.KeyValue; +import io.lettuce.core.Value; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.commands.HashCommandIntegrationTests; import io.lettuce.test.ReactiveSyncInvocationHandler; @@ -26,8 +36,31 @@ */ class HashReactiveCommandIntegrationTests extends HashCommandIntegrationTests { + private final StatefulRedisConnection connection; + @Inject HashReactiveCommandIntegrationTests(StatefulRedisConnection connection) { super(ReactiveSyncInvocationHandler.sync(connection)); + this.connection = connection; + } + + @Test + public void hgetall() { + + connection.sync().hset(key, "zero", "0"); + connection.sync().hset(key, "one", "1"); + connection.sync().hset(key, "two", "2"); + + connection.reactive().hgetall(key).collect(Collectors.toMap(KeyValue::getKey, Value::getValue)).as(StepVerifier::create) + .assertNext(actual -> { + + assertThat(actual).containsEntry("zero", "0").containsEntry("one", "1").containsEntry("two", "2"); + }).verifyComplete(); + } + + @Test + @Disabled("API differences") + public void hgetallStreaming() { + } }