Skip to content

Commit

Permalink
Merge pull request #440 from Palmr/rewindable-event-handler-separation
Browse files Browse the repository at this point in the history
Rewindable event handler separation
  • Loading branch information
grumpyjames authored Mar 24, 2023
2 parents ae0e471 + 63d5af7 commit e6f783b
Show file tree
Hide file tree
Showing 30 changed files with 375 additions and 230 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.lmax.disruptor.examples;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BatchEventProcessorBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
Expand Down Expand Up @@ -52,11 +53,11 @@ public static void main(final String[] args) throws InterruptedException
// Construct 2 batch event processors.
DynamicHandler handler1 = new DynamicHandler();
BatchEventProcessor<StubEvent> processor1 =
new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1);
new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), handler1);

DynamicHandler handler2 = new DynamicHandler();
BatchEventProcessor<StubEvent> processor2 =
new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2);
new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2);

// Dynamically add both sequences to the ring buffer
ringBuffer.addGatingSequences(processor1.getSequence(), processor2.getSequence());
Expand Down
111 changes: 48 additions & 63 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,66 +28,43 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
public final class BatchEventProcessor<T>
implements EventProcessor
implements EventProcessor
{
private static final int IDLE = 0;
private static final int HALTED = IDLE + 1;
private static final int RUNNING = HALTED + 1;
private static final int DEFAULT_MAX_BATCH_SIZE = Integer.MAX_VALUE;

private final AtomicInteger running = new AtomicInteger(IDLE);
private ExceptionHandler<? super T> exceptionHandler;
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final EventHandlerBase<? super T> eventHandler;
private final int batchLimitOffset;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy();
private final RewindHandler rewindHandler;
private int retriesAttempted = 0;

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param maxBatchSize limits number of events processed in a batch before updating the sequence.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler,
final int maxBatchSize
BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandlerBase<? super T> eventHandler,
final int maxBatchSize,
final BatchRewindStrategy batchRewindStrategy
)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;

if (maxBatchSize < 1)
{
throw new IllegalArgumentException("maxBatchSize must be greater than 0");
}
this.batchLimitOffset = maxBatchSize - 1;

eventHandler.setSequenceCallback(sequence);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler
)
{
this(dataProvider, sequenceBarrier, eventHandler, DEFAULT_MAX_BATCH_SIZE);
this.rewindHandler = eventHandler instanceof RewindableEventHandler
? new TryRewindHandler(batchRewindStrategy)
: new NoRewindHandler();
}

@Override
Expand Down Expand Up @@ -124,23 +101,6 @@ public void setExceptionHandler(final ExceptionHandler<? super T> exceptionHandl
this.exceptionHandler = exceptionHandler;
}

/**
* Set a new {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}
* Which can include whether the batch should be rewound and reattempted,
* or simply thrown and move on to the next sequence
* the default is a {@link SimpleBatchRewindStrategy} which always rewinds
* @param batchRewindStrategy to replace the existing rewindStrategy.
*/
public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException();
}

this.batchRewindStrategy = batchRewindStrategy;
}

/**
* It is ok to have another thread rerun this method after a halt().
*
Expand Down Expand Up @@ -214,15 +174,7 @@ private void processEvents()
}
catch (final RewindableException e)
{
if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
{
nextSequence = startOfBatchSequence;
}
else
{
retriesAttempted = 0;
throw e;
}
nextSequence = rewindHandler.attemptRewindGetNextSequence(e, startOfBatchSequence);
}
}
catch (final TimeoutException e)
Expand Down Expand Up @@ -325,4 +277,37 @@ private ExceptionHandler<? super T> getExceptionHandler()
ExceptionHandler<? super T> handler = exceptionHandler;
return handler == null ? ExceptionHandlers.defaultHandler() : handler;
}

private class TryRewindHandler implements RewindHandler
{
private final BatchRewindStrategy batchRewindStrategy;

TryRewindHandler(final BatchRewindStrategy batchRewindStrategy)
{
this.batchRewindStrategy = batchRewindStrategy;
}

@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException
{
if (batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
{
return startOfBatchSequence;
}
else
{
retriesAttempted = 0;
throw e;
}
}
}

private static class NoRewindHandler implements RewindHandler
{
@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence)
{
throw new UnsupportedOperationException("Rewindable Exception thrown from a non-rewindable event handler", e);
}
}
}
87 changes: 87 additions & 0 deletions src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2023 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lmax.disruptor;

public final class BatchEventProcessorBuilder
{
private int maxBatchSize = Integer.MAX_VALUE;

/**
* Set the maximum number of events that will be processed in a batch before updating the sequence.
*
* @param maxBatchSize max number of events to process in one batch.
* @return The builder
*/
public BatchEventProcessorBuilder setMaxBatchSize(final int maxBatchSize)
{
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* <p>The created {@link BatchEventProcessor} will not support batch rewind,
* but {@link EventHandler#setSequenceCallback(Sequence)} will be supported.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
final BatchEventProcessor<T> processor = new BatchEventProcessor<>(
dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null
);
eventHandler.setSequenceCallback(processor.getSequence());

return processor;
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
* @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler,
final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor");
}

return new BatchEventProcessor<>(
dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy
);
}
}
42 changes: 3 additions & 39 deletions src/main/java/com/lmax/disruptor/EventHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
*/
public interface EventHandler<T>
public interface EventHandler<T> extends EventHandlerBase<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
Expand All @@ -36,35 +36,9 @@ public interface EventHandler<T>
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

/**
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
* @param queueDepth the total number of queued up events including the batch about to be processed
*/
default void onBatchStart(long batchSize, long queueDepth)
{
}

/**
* Called once on thread start before first event is available.
*/
default void onStart()
{
}

/**
* Called once just before the event processing thread is shutdown.
*
* <p>Sequence event processing will already have stopped before this method is called. No events will
* be processed after this message.
*/
default void onShutdown()
{
}

/**
* Used by the {@link BatchEventProcessor} to set a callback allowing the {@link EventHandler} to notify
* when it has finished consuming an event if this happens after the {@link EventHandler#onEvent(Object, long, boolean)} call.
Expand All @@ -78,14 +52,4 @@ default void onShutdown()
default void setSequenceCallback(Sequence sequenceCallback)
{
}

/**
* Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}.
*
* @param sequence - the last processed sequence.
* @throws Exception if the implementation is unable to handle this timeout.
*/
default void onTimeout(long sequence) throws Exception
{
}
}
Loading

0 comments on commit e6f783b

Please sign in to comment.