Skip to content

Commit

Permalink
Move mutable rewind strategy to builder where it can be scoped to onl…
Browse files Browse the repository at this point in the history
…y the rewind supporting build method
  • Loading branch information
Palmr committed Mar 18, 2023
1 parent 47fd018 commit 992576a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 51 deletions.
31 changes: 10 additions & 21 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ public final class BatchEventProcessor<T>
private final int batchLimitOffset;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RewindHandler rewindHandler;
private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy();
private int retriesAttempted = 0;

BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandlerBase<? super T> eventHandler,
final int maxBatchSize
final int maxBatchSize,
final BatchRewindStrategy batchRewindStrategy
)
{
this.dataProvider = dataProvider;
Expand All @@ -63,7 +63,7 @@ public final class BatchEventProcessor<T>
this.batchLimitOffset = maxBatchSize - 1;

this.rewindHandler = eventHandler instanceof RewindableEventHandler
? new TryRewindHandler()
? new TryRewindHandler(batchRewindStrategy)
: new NoRewindHandler();
}

Expand Down Expand Up @@ -101,24 +101,6 @@ public void setExceptionHandler(final ExceptionHandler<? super T> exceptionHandl
this.exceptionHandler = exceptionHandler;
}

/**
* Set a new {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}
* Which can include whether the batch should be rewound and reattempted,
* or simply thrown and move on to the next sequence
* the default is a {@link SimpleBatchRewindStrategy} which always rewinds
*
* @param batchRewindStrategy to replace the existing rewindStrategy.
*/
public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException();
}

this.batchRewindStrategy = batchRewindStrategy;
}

/**
* It is ok to have another thread rerun this method after a halt().
*
Expand Down Expand Up @@ -298,6 +280,13 @@ private ExceptionHandler<? super T> getExceptionHandler()

private class TryRewindHandler implements RewindHandler
{
private final BatchRewindStrategy batchRewindStrategy;

TryRewindHandler(final BatchRewindStrategy batchRewindStrategy)
{
this.batchRewindStrategy = batchRewindStrategy;
}

@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException
{
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public <T> BatchEventProcessor<T> build(
final EventHandler<? super T> eventHandler)
{
final BatchEventProcessor<T> processor = new BatchEventProcessor<>(
dataProvider, sequenceBarrier, eventHandler, maxBatchSize
dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null
);
eventHandler.setSequenceCallback(processor.getSequence());

Expand All @@ -65,16 +65,23 @@ public <T> BatchEventProcessor<T> build(
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
* @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler)
final RewindableEventHandler<? super T> rewindableEventHandler,
final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor");
}

return new BatchEventProcessor<>(
dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize
dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy
);
}
}
64 changes: 37 additions & 27 deletions src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RewindBatchEventProcessorTest
{
Expand All @@ -38,7 +39,7 @@ public void shouldRewindOnFirstEventOfBatchSizeOfOne()
fill(ringBuffer, 1);

final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(0, 1)), 0, -1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -59,7 +60,7 @@ public void shouldRewindOnFirstEventOfBatch()
singletonList(rewind(0, 1)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -80,7 +81,7 @@ public void shouldRewindOnEventInMiddleOfBatch()
singletonList(rewind(8, 1)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -104,7 +105,7 @@ public void shouldRewindOnLastEventOfBatch()
-1
);

final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -125,7 +126,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatch()
singletonList(rewind(4, 1)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -141,7 +142,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatchOfOne()
fill(ringBuffer, 1);

final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 1)), 0, -1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -161,7 +162,7 @@ public void shouldRewindMultipleTimes()
singletonList(rewind(8, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -184,7 +185,7 @@ public void shouldRewindMultipleTimesOnLastEventInBatch()
singletonList(rewind(lastSequenceNumber, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -207,7 +208,7 @@ public void shouldRewindMultipleTimesInSameBatch()
asList(rewind(5, 3), rewind(7, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -228,7 +229,7 @@ public void shouldRewindMultipleTimesOnBatchOfOne()
fill(ringBuffer, 1);

final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 3)), 0, -1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -246,7 +247,7 @@ public void shouldFallOverWhenNonRewindableExceptionIsThrown()
fill(ringBuffer, ringBufferEntries);

final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, 8);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

AtomicReference<Throwable> exceptionHandled = new AtomicReference<>();
Expand All @@ -263,7 +264,7 @@ public void shouldProcessUpToMaxBatchSizeForEachGivenBatch()
fill(ringBuffer, ringBufferEntries);

final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, -1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -285,7 +286,7 @@ public void shouldOnlyRewindBatch()
singletonList(rewind(15, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy());
eventHandler.setRewindable(eventProcessor);

eventProcessor.run();
Expand All @@ -308,11 +309,11 @@ void shouldInvokeRewindPauseStrategyOnRewind()
singletonList(rewind(15, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, rewindPauseStrategy);

eventHandler.setRewindable(eventProcessor);

CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
eventProcessor.setRewindStrategy(rewindPauseStrategy);
eventProcessor.run();

assertThat(values, containsExactSequence(
Expand All @@ -335,11 +336,11 @@ void shouldNotInvokeRewindPauseStrategyWhenNoRewindsOccur()
singletonList(rewind(-1, -1)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, rewindPauseStrategy);

eventHandler.setRewindable(eventProcessor);

CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
eventProcessor.setRewindStrategy(rewindPauseStrategy);
eventProcessor.run();

assertThat(values, containsExactSequence(
Expand All @@ -359,10 +360,10 @@ void shouldCopeWithTheNanosecondRewindPauseStrategy()
singletonList(rewind(15, 3)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new NanosecondPauseBatchRewindStrategy(1000));

eventHandler.setRewindable(eventProcessor);

eventProcessor.setRewindStrategy(new NanosecondPauseBatchRewindStrategy(1000));
eventProcessor.run();

assertThat(values, containsExactSequence(
Expand All @@ -381,19 +382,18 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy()
int lastSequenceNumber = ringBufferEntries - 1;
fill(ringBuffer, ringBufferEntries);

EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3);

final TestEventHandler eventHandler = new TestEventHandler(values,
asList(rewind(15, 99), rewind(25, 99)),
lastSequenceNumber,
-1);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3);
final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, batchRewindStrategy);

eventHandler.setRewindable(eventProcessor);

AtomicReference<Throwable> exceptionHandled = new AtomicReference<>();
eventProcessor.setExceptionHandler(new StubExceptionHandler(exceptionHandled));

eventProcessor.setRewindStrategy(batchRewindStrategy);
eventProcessor.run();

assertThat(values, containsExactSequence(
Expand All @@ -406,6 +406,16 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy()
event(26, lastSequenceNumber))); // unable to process 25 so it ends up skipping it
}

@Test
void shouldNotAllowNullBatchRewindStrategy()
{
final TestEventHandler eventHandler = new TestEventHandler(values,
asList(rewind(15, 99), rewind(25, 99)),
-1,
-1);
final BatchEventProcessorBuilder batchEventProcessorBuilder = new BatchEventProcessorBuilder();
assertThrows(NullPointerException.class, () -> batchEventProcessorBuilder.build(ringBuffer, ringBuffer.newBarrier(), eventHandler, null));
}

private static ForceRewindSequence rewind(final long sequenceNumberToFailOn, final long timesToFail)
{
Expand All @@ -417,10 +427,10 @@ private EventRangeExpectation event(final long sequenceStart, final long sequenc
return new EventRangeExpectation(sequenceStart, sequenceEnd, false);
}

private BatchEventProcessor<LongEvent> create(final TestEventHandler eventHandler)
private BatchEventProcessor<LongEvent> create(final TestEventHandler eventHandler, final BatchRewindStrategy batchRewindStrategy)
{
return new BatchEventProcessorBuilder()
.build(ringBuffer, ringBuffer.newBarrier(), eventHandler);
.build(ringBuffer, ringBuffer.newBarrier(), eventHandler, batchRewindStrategy);
}

private final class TestEventHandler implements RewindableEventHandler<LongEvent>
Expand Down

0 comments on commit 992576a

Please sign in to comment.