From 992576aedb67bfd24822aca9d5cc31157af18384 Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Sat, 18 Mar 2023 17:15:48 +0000 Subject: [PATCH] Move mutable rewind strategy to builder where it can be scoped to only the rewind supporting build method --- .../lmax/disruptor/BatchEventProcessor.java | 31 +++------ .../disruptor/BatchEventProcessorBuilder.java | 13 +++- .../RewindBatchEventProcessorTest.java | 64 +++++++++++-------- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 0e64cff8e..a493f6732 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -42,14 +42,14 @@ public final class BatchEventProcessor private final int batchLimitOffset; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RewindHandler rewindHandler; - private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy(); private int retriesAttempted = 0; BatchEventProcessor( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, final EventHandlerBase eventHandler, - final int maxBatchSize + final int maxBatchSize, + final BatchRewindStrategy batchRewindStrategy ) { this.dataProvider = dataProvider; @@ -63,7 +63,7 @@ public final class BatchEventProcessor this.batchLimitOffset = maxBatchSize - 1; this.rewindHandler = eventHandler instanceof RewindableEventHandler - ? new TryRewindHandler() + ? new TryRewindHandler(batchRewindStrategy) : new NoRewindHandler(); } @@ -101,24 +101,6 @@ public void setExceptionHandler(final ExceptionHandler exceptionHandl this.exceptionHandler = exceptionHandler; } - /** - * Set a new {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException} - * Which can include whether the batch should be rewound and reattempted, - * or simply thrown and move on to the next sequence - * the default is a {@link SimpleBatchRewindStrategy} which always rewinds - * - * @param batchRewindStrategy to replace the existing rewindStrategy. - */ - public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy) - { - if (null == batchRewindStrategy) - { - throw new NullPointerException(); - } - - this.batchRewindStrategy = batchRewindStrategy; - } - /** * It is ok to have another thread rerun this method after a halt(). * @@ -298,6 +280,13 @@ private ExceptionHandler getExceptionHandler() private class TryRewindHandler implements RewindHandler { + private final BatchRewindStrategy batchRewindStrategy; + + TryRewindHandler(final BatchRewindStrategy batchRewindStrategy) + { + this.batchRewindStrategy = batchRewindStrategy; + } + @Override public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException { diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java index 77e8ff9ae..2b02c9918 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java @@ -51,7 +51,7 @@ public BatchEventProcessor build( final EventHandler eventHandler) { final BatchEventProcessor processor = new BatchEventProcessor<>( - dataProvider, sequenceBarrier, eventHandler, maxBatchSize + dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null ); eventHandler.setSequenceCallback(processor.getSequence()); @@ -65,16 +65,23 @@ public BatchEventProcessor build( * @param dataProvider to which events are published. * @param sequenceBarrier on which it is waiting. * @param rewindableEventHandler is the delegate to which events are dispatched. + * @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}. * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. * @return the BatchEventProcessor */ public BatchEventProcessor build( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, - final RewindableEventHandler rewindableEventHandler) + final RewindableEventHandler rewindableEventHandler, + final BatchRewindStrategy batchRewindStrategy) { + if (null == batchRewindStrategy) + { + throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor"); + } + return new BatchEventProcessor<>( - dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize + dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy ); } } diff --git a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java index 12f31eff2..8ec2f5cbd 100644 --- a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java @@ -19,6 +19,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RewindBatchEventProcessorTest { @@ -38,7 +39,7 @@ public void shouldRewindOnFirstEventOfBatchSizeOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -59,7 +60,7 @@ public void shouldRewindOnFirstEventOfBatch() singletonList(rewind(0, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -80,7 +81,7 @@ public void shouldRewindOnEventInMiddleOfBatch() singletonList(rewind(8, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -104,7 +105,7 @@ public void shouldRewindOnLastEventOfBatch() -1 ); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -125,7 +126,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatch() singletonList(rewind(4, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -141,7 +142,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -161,7 +162,7 @@ public void shouldRewindMultipleTimes() singletonList(rewind(8, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -184,7 +185,7 @@ public void shouldRewindMultipleTimesOnLastEventInBatch() singletonList(rewind(lastSequenceNumber, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -207,7 +208,7 @@ public void shouldRewindMultipleTimesInSameBatch() asList(rewind(5, 3), rewind(7, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -228,7 +229,7 @@ public void shouldRewindMultipleTimesOnBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 3)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -246,7 +247,7 @@ public void shouldFallOverWhenNonRewindableExceptionIsThrown() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, 8); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); AtomicReference exceptionHandled = new AtomicReference<>(); @@ -263,7 +264,7 @@ public void shouldProcessUpToMaxBatchSizeForEachGivenBatch() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -285,7 +286,7 @@ public void shouldOnlyRewindBatch() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -308,11 +309,11 @@ void shouldInvokeRewindPauseStrategyOnRewind() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); + final BatchEventProcessor eventProcessor = create(eventHandler, rewindPauseStrategy); + eventHandler.setRewindable(eventProcessor); - CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); - eventProcessor.setRewindStrategy(rewindPauseStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -335,11 +336,11 @@ void shouldNotInvokeRewindPauseStrategyWhenNoRewindsOccur() singletonList(rewind(-1, -1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); + final BatchEventProcessor eventProcessor = create(eventHandler, rewindPauseStrategy); + eventHandler.setRewindable(eventProcessor); - CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); - eventProcessor.setRewindStrategy(rewindPauseStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -359,10 +360,10 @@ void shouldCopeWithTheNanosecondRewindPauseStrategy() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new NanosecondPauseBatchRewindStrategy(1000)); + eventHandler.setRewindable(eventProcessor); - eventProcessor.setRewindStrategy(new NanosecondPauseBatchRewindStrategy(1000)); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -381,19 +382,18 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy() int lastSequenceNumber = ringBufferEntries - 1; fill(ringBuffer, ringBufferEntries); - EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3); - final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(15, 99), rewind(25, 99)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3); + final BatchEventProcessor eventProcessor = create(eventHandler, batchRewindStrategy); + eventHandler.setRewindable(eventProcessor); AtomicReference exceptionHandled = new AtomicReference<>(); eventProcessor.setExceptionHandler(new StubExceptionHandler(exceptionHandled)); - eventProcessor.setRewindStrategy(batchRewindStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -406,6 +406,16 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy() event(26, lastSequenceNumber))); // unable to process 25 so it ends up skipping it } + @Test + void shouldNotAllowNullBatchRewindStrategy() + { + final TestEventHandler eventHandler = new TestEventHandler(values, + asList(rewind(15, 99), rewind(25, 99)), + -1, + -1); + final BatchEventProcessorBuilder batchEventProcessorBuilder = new BatchEventProcessorBuilder(); + assertThrows(NullPointerException.class, () -> batchEventProcessorBuilder.build(ringBuffer, ringBuffer.newBarrier(), eventHandler, null)); + } private static ForceRewindSequence rewind(final long sequenceNumberToFailOn, final long timesToFail) { @@ -417,10 +427,10 @@ private EventRangeExpectation event(final long sequenceStart, final long sequenc return new EventRangeExpectation(sequenceStart, sequenceEnd, false); } - private BatchEventProcessor create(final TestEventHandler eventHandler) + private BatchEventProcessor create(final TestEventHandler eventHandler, final BatchRewindStrategy batchRewindStrategy) { return new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), eventHandler); + .build(ringBuffer, ringBuffer.newBarrier(), eventHandler, batchRewindStrategy); } private final class TestEventHandler implements RewindableEventHandler