diff --git a/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java b/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java index f24ea8119..184357554 100644 --- a/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java +++ b/src/main/java/com/github/fppt/jedismock/operations/streams/XRead.java @@ -87,7 +87,7 @@ protected Slice response() { mapKeyToBeginEntryId.append( key, "$".equalsIgnoreCase(id.toString()) - ? new StreamId(0, 1) // lowest possible id + ? new StreamId(0, 0) // lowest possible id : new StreamId(id) ); } else { diff --git a/src/test/java/com/github/fppt/jedismock/comparisontests/scripting/EvalTest.java b/src/test/java/com/github/fppt/jedismock/comparisontests/scripting/EvalTest.java index fe42c54e2..de792ddc6 100644 --- a/src/test/java/com/github/fppt/jedismock/comparisontests/scripting/EvalTest.java +++ b/src/test/java/com/github/fppt/jedismock/comparisontests/scripting/EvalTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.LIST; @ExtendWith(ComparisonBase.class) public class EvalTest { @@ -58,7 +59,7 @@ void evalTableOfStringsTest(Jedis jedis) { void evalTableOfLongTest(Jedis jedis) { Object eval_return = jedis.eval("return { 1, 2, 3 }", 0); assertThat(eval_return).isInstanceOf(ArrayList.class) - .asList().containsExactly(1L, 2L, 3L); + .asInstanceOf(LIST).containsExactly(1L, 2L, 3L); assertThat(((List) eval_return).get(0)).isInstanceOf(Long.class); } @@ -66,7 +67,7 @@ void evalTableOfLongTest(Jedis jedis) { void evalDeepListTest(Jedis jedis) { Object eval_return = jedis.eval("return { 'test', 2, {'test', 2} }", 0); assertThat(eval_return).isInstanceOf(ArrayList.class) - .asList().containsExactly("test", 2L, Arrays.asList("test", 2L)); + .asInstanceOf(LIST).containsExactly("test", 2L, asList("test", 2L)); assertThat(((List) eval_return).get(0)).isInstanceOf(String.class); assertThat(((List) eval_return).get(1)).isInstanceOf(Long.class); assertThat(((List) eval_return).get(2)).isInstanceOf(ArrayList.class); @@ -76,7 +77,7 @@ void evalDeepListTest(Jedis jedis) { void evalDictTest(Jedis jedis) { Object eval_return = jedis.eval("return { a = 1, 2 }", 0); assertThat(eval_return).isInstanceOf(ArrayList.class) - .asList().containsExactly(2L); + .asInstanceOf(LIST).containsExactly(2L); assertThat(((List) eval_return).get(0)).isInstanceOf(Long.class); } diff --git a/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java b/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java index e310d1a2f..29b571385 100644 --- a/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java +++ b/src/test/java/com/github/fppt/jedismock/comparisontests/streams/XReadTests.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.LIST; @ExtendWith(ComparisonBase.class) public class XReadTests { @@ -68,7 +69,7 @@ void blockingXREADforStreamThatRanDry(Jedis jedis) throws ExecutionException, In .hasSize(1) .first() .extracting(Map.Entry::getValue) - .asList() + .asInstanceOf(LIST) .hasSize(1) .first() .usingRecursiveComparison() @@ -98,7 +99,7 @@ void whenAddIsInvokedWithSingleStream_EnsureTemporaryBlockedXreadIsAwaken(Jedis .hasSize(1) .first() .extracting(Map.Entry::getValue) - .asList() + .asInstanceOf(LIST) .hasSize(1) .first() .usingRecursiveComparison() @@ -146,7 +147,7 @@ void whenAddIsInvokedWithSeveralStreams_EnsureTemporaryBlockedXreadAwakenImmedia .hasSize(1) .first() .extracting(Map.Entry::getValue) - .asList() + .asInstanceOf(LIST) .hasSize(1) .first() .usingRecursiveComparison() @@ -186,4 +187,50 @@ void whenAddIsInvokedWithSeveralStreams_EnsureTemporaryBlockedXreadAwakenImmedia assertThat(blockingReadJob.isCancelled()).isFalse(); } + + @TestTemplate + void whenLastEntryIsCalledOnEmptyStream_EnsureXaddAwakes(Jedis jedis) + throws ExecutionException, InterruptedException { + Future blockingReadJob = scheduledThreadPool.submit(() -> { + List>> answer = blockedClient.xread( + XReadParams.xReadParams().block(0).count(1), + Collections.singletonMap("test:jedis", StreamEntryID.LAST_ENTRY) + ); + + assertThat(answer) + .hasSize(1) + .first() + .extracting(Map.Entry::getValue) + .asInstanceOf(LIST) + .hasSize(1) + .first() + .usingRecursiveComparison() + .comparingOnlyFields("fields") + .isEqualTo( + new StreamEntry( + new StreamEntryID(0, 1), + Collections.singletonMap("a", "b") + ) + ); + }); + + ScheduledFuture addJob = scheduledThreadPool.schedule(() -> { + jedis.xadd( + "test:jedis", + XAddParams.xAddParams(), + Collections.singletonMap("a", "b") + ); + }, 2, TimeUnit.SECONDS); + + + ScheduledFuture cancellationJob = scheduledThreadPool.schedule(() -> { + blockingReadJob.cancel(true); + }, 15, TimeUnit.SECONDS); + + addJob.get(); + cancellationJob.get(); + blockingReadJob.get(); + + assertThat(blockingReadJob.isCancelled()).isFalse(); + } }