Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify EventHandler implementations. #461

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class BatchEventProcessor<T>
private ExceptionHandler<? super T> exceptionHandler;
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandlerBase<? super T> eventHandler;
private final EventHandler<? super T> eventHandler;
private final int batchLimitOffset;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RewindHandler rewindHandler;
Expand All @@ -47,7 +47,7 @@ public final class BatchEventProcessor<T>
BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandlerBase<? super T> eventHandler,
final EventHandler<? super T> eventHandler,
final int maxBatchSize,
final BatchRewindStrategy batchRewindStrategy
)
Expand All @@ -61,10 +61,11 @@ public final class BatchEventProcessor<T>
throw new IllegalArgumentException("maxBatchSize must be greater than 0");
}
this.batchLimitOffset = maxBatchSize - 1;
this.rewindHandler = batchRewindStrategy == null ? NoRewindHandler.getInstance() : new TryRewindHandler(batchRewindStrategy);
}

this.rewindHandler = eventHandler instanceof RewindableEventHandler
? new TryRewindHandler(batchRewindStrategy)
: new NoRewindHandler();
public boolean isRewindable() {
return !(rewindHandler instanceof NoRewindHandler);
}

@Override
Expand Down Expand Up @@ -304,10 +305,19 @@ public long attemptRewindGetNextSequence(final RewindableException e, final long

private static class NoRewindHandler implements RewindHandler
{

private static final RewindHandler INSTANCE = new NoRewindHandler();

private NoRewindHandler() {}

@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence)
{
throw new UnsupportedOperationException("Rewindable Exception thrown from a non-rewindable event handler", e);
}

public static RewindHandler getInstance() {
return INSTANCE;
}
}
}
32 changes: 13 additions & 19 deletions src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,32 @@ public <T> BatchEventProcessor<T> build(
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
final BatchEventProcessor<T> processor = new BatchEventProcessor<>(
dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null
);
eventHandler.setSequenceCallback(processor.getSequence());

return processor;
return build(dataProvider, sequenceBarrier, eventHandler, null);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
* @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler,
final EventHandler<? super T> eventHandler,
final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor");
}

return new BatchEventProcessor<>(
dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy
final BatchEventProcessor<T> processor = new BatchEventProcessor<>(
dataProvider, sequenceBarrier, eventHandler, maxBatchSize, batchRewindStrategy
);

eventHandler.setSequenceCallback(processor.getSequence());

return processor;
}
}
49 changes: 40 additions & 9 deletions src/main/java/com/lmax/disruptor/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
*/
package com.lmax.disruptor;

/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
*/
public interface EventHandler<T> extends EventHandlerBase<T>
public interface EventHandler<T> extends EventHandlerIdentity
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
Expand All @@ -34,11 +28,48 @@ public interface EventHandler<T> extends EventHandlerBase<T>
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
* @throws Exception if the EventHandler would like the exception handled further up the chain or possible rewind
* the batch if a {@link RewindableException} is thrown.
*/
@Override
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

/**
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
* @param queueDepth the total number of queued up events including the batch about to be processed
*/
default void onBatchStart(long batchSize, long queueDepth)
{
}

/**
* Called once on thread start before first event is available.
*/
default void onStart()
{
}

/**
* Called once just before the event processing thread is shutdown.
*
* <p>Sequence event processing will already have stopped before this method is called. No events will
* be processed after this message.
*/
default void onShutdown()
{
}

/**
* Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}.
*
* @param sequence - the last processed sequence.
* @throws Exception if the implementation is unable to handle this timeout.
*/
default void onTimeout(long sequence) throws Exception
{
}

/**
* Used by the {@link BatchEventProcessor} to set a callback allowing the {@link EventHandler} to notify
* when it has finished consuming an event if this happens after the {@link EventHandler#onEvent(Object, long, boolean)} call.
Expand Down
72 changes: 0 additions & 72 deletions src/main/java/com/lmax/disruptor/EventHandlerBase.java

This file was deleted.

43 changes: 0 additions & 43 deletions src/main/java/com/lmax/disruptor/RewindableEventHandler.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/com/lmax/disruptor/RewindableException.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* On throwing this exception the {@link BatchEventProcessor} can choose to rewind and replay the batch or throw
* depending on the {@link BatchRewindStrategy}
*/
public class RewindableException extends Throwable
public class RewindableException extends RuntimeException
{
/**
* @param cause The underlying cause of the exception.
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/com/lmax/disruptor/dsl/Disruptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RewindableException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
Expand Down Expand Up @@ -147,7 +146,7 @@ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>
@SuppressWarnings("varargs")
@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final BatchRewindStrategy batchRewindStrategy,
final RewindableEventHandler<? super T>... handlers)
final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], batchRewindStrategy, handlers);
}
Expand Down Expand Up @@ -536,7 +535,7 @@ EventHandlerGroup<T> createEventProcessors(
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final BatchRewindStrategy batchRewindStrategy,
final RewindableEventHandler<? super T>[] eventHandlers)
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();

Expand All @@ -545,7 +544,7 @@ EventHandlerGroup<T> createEventProcessors(

for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final RewindableEventHandler<? super T> eventHandler = eventHandlers[i];
final EventHandler<? super T> eventHandler = eventHandlers[i];

final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessorBuilder().build(ringBuffer, barrier, eventHandler, batchRewindStrategy);
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.lmax.disruptor.BatchRewindStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RewindableException;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
Expand Down Expand Up @@ -115,7 +114,7 @@ public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers
*/
@SafeVarargs
public final EventHandlerGroup<T> then(final BatchRewindStrategy batchRewindStrategy,
final RewindableEventHandler<? super T>... handlers)
final EventHandler<? super T>... handlers)
{
return handleEventsWith(batchRewindStrategy, handlers);
}
Expand Down Expand Up @@ -169,7 +168,7 @@ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>
*/
@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final BatchRewindStrategy batchRewindStrategy,
final RewindableEventHandler<? super T>... handlers)
final EventHandler<? super T>... handlers)
{
return disruptor.createEventProcessors(sequences, batchRewindStrategy, handlers);
}
Expand Down
Loading