diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 46dffcace..3359790ab 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -33,6 +33,7 @@ public final class BatchEventProcessor private static final int IDLE = 0; private static final int HALTED = IDLE + 1; private static final int RUNNING = HALTED + 1; + private static final int DEFAULT_MAX_BATCH_SIZE = Integer.MAX_VALUE; private final AtomicInteger running = new AtomicInteger(IDLE); private ExceptionHandler exceptionHandler; @@ -124,7 +125,7 @@ public BatchEventProcessor( final EventHandler eventHandler ) { - this(dataProvider, sequenceBarrier, eventHandler, Integer.MAX_VALUE); + this(dataProvider, sequenceBarrier, eventHandler, DEFAULT_MAX_BATCH_SIZE); } /** @@ -141,7 +142,7 @@ public BatchEventProcessor( final RewindableEventHandler rewindableEventHandler ) { - this(dataProvider, sequenceBarrier, rewindableEventHandler, Integer.MAX_VALUE); + this(dataProvider, sequenceBarrier, rewindableEventHandler, DEFAULT_MAX_BATCH_SIZE); } @Override @@ -251,9 +252,9 @@ private void processEvents() final long availableSequence = sequenceBarrier.waitFor(nextSequence); final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence); - if (endOfBatchSequence >= nextSequence) + if (nextSequence <= endOfBatchSequence) { - eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1); + eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1); } while (nextSequence <= endOfBatchSequence) diff --git a/src/main/java/com/lmax/disruptor/EventHandlerBase.java b/src/main/java/com/lmax/disruptor/EventHandlerBase.java index 2ed0db9f6..006125a50 100644 --- a/src/main/java/com/lmax/disruptor/EventHandlerBase.java +++ b/src/main/java/com/lmax/disruptor/EventHandlerBase.java @@ -37,8 +37,9 @@ interface EventHandlerBase * Invoked by {@link BatchEventProcessor} prior to processing a batch of events * * @param batchSize the size of the batch that is starting + * @param queueDepth the total number of queued up events including the batch about to be processed */ - default void onBatchStart(long batchSize) + default void onBatchStart(long batchSize, long queueDepth) { } diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java index ec11f4e70..bc49610da 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java @@ -136,7 +136,7 @@ public void reset(final CountDownLatch latch, final long expectedCount) } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchesProcessed.increment(); } diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java index f25885f00..3cf7cb3f1 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java @@ -140,7 +140,7 @@ public void reset(final CountDownLatch latch, final long expectedCount) } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchesProcessed.increment(); } diff --git a/src/perftest/java/com/lmax/disruptor/support/LongArrayEventHandler.java b/src/perftest/java/com/lmax/disruptor/support/LongArrayEventHandler.java index 4f1b5eae5..9b787a00f 100644 --- a/src/perftest/java/com/lmax/disruptor/support/LongArrayEventHandler.java +++ b/src/perftest/java/com/lmax/disruptor/support/LongArrayEventHandler.java @@ -60,7 +60,7 @@ public void onEvent(final long[] event, final long sequence, final boolean endOf } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchesProcessed.increment(); } diff --git a/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java b/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java index f1582cbe5..1953902b0 100644 --- a/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java +++ b/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java @@ -57,7 +57,7 @@ public void onEvent(final ValueEvent event, final long sequence, final boolean e } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchesProcessed.increment(); } diff --git a/src/perftest/java/com/lmax/disruptor/support/ValueMutationEventHandler.java b/src/perftest/java/com/lmax/disruptor/support/ValueMutationEventHandler.java index 03c70899c..b7e20981e 100644 --- a/src/perftest/java/com/lmax/disruptor/support/ValueMutationEventHandler.java +++ b/src/perftest/java/com/lmax/disruptor/support/ValueMutationEventHandler.java @@ -63,7 +63,7 @@ public void onEvent(final ValueEvent event, final long sequence, final boolean e } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchesProcessed.increment(); } diff --git a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java index 8153533fd..4e42e4a12 100644 --- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java @@ -162,7 +162,7 @@ final class LoopbackEventHandler { @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { batchSizes.add(batchSize); } @@ -364,7 +364,7 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { final Integer currentCount = batchSizeToCountMap.get(batchSize); final int nextCount = null == currentCount ? 1 : currentCount + 1; diff --git a/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java b/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java index f436be377..cda7e41b8 100644 --- a/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java @@ -57,33 +57,18 @@ void setUp() @Test public void shouldLimitTheBatchToConfiguredMaxBatchSize() throws Exception { - long sequence = 0; - for (int i = 0; i < PUBLISH_COUNT; i++) - { - sequence = ringBuffer.next(); - } - ringBuffer.publish(sequence); - - //Wait for consumer to process all events - countDownLatch.await(); + publishEvents(); assertEquals(eventHandler.batchedSequences, Arrays.asList(Arrays.asList(0L, 1L, 2L), Arrays.asList(3L, 4L))); } @Test - public void shouldAnnounceBatchSizeAtTheStartOfBatch() throws Exception + public void shouldAnnounceBatchSizeAndQueueDepthAtTheStartOfBatch() throws Exception { - long sequence = 0; - for (int i = 0; i < PUBLISH_COUNT; i++) - { - sequence = ringBuffer.next(); - } - ringBuffer.publish(sequence); - - //Wait for consumer to process all events - countDownLatch.await(); + publishEvents(); assertEquals(eventHandler.announcedBatchSizes, Arrays.asList(3L, 2L)); + assertEquals(eventHandler.announcedQueueDepths, Arrays.asList(5L, 2L)); } @AfterEach @@ -93,12 +78,26 @@ void tearDown() throws InterruptedException thread.join(); } + private void publishEvents() throws InterruptedException + { + long sequence = 0; + for (int i = 0; i < PUBLISH_COUNT; i++) + { + sequence = ringBuffer.next(); + } + ringBuffer.publish(sequence); + + //Wait for consumer to process all events + countDownLatch.await(); + } + private static class BatchLimitRecordingHandler implements EventHandler { public final List> batchedSequences = new ArrayList<>(); private List currentSequences; private final CountDownLatch countDownLatch; private final List announcedBatchSizes = new ArrayList<>(); + private final List announcedQueueDepths = new ArrayList<>(); BatchLimitRecordingHandler(final CountDownLatch countDownLatch) { @@ -119,10 +118,11 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en } @Override - public void onBatchStart(final long batchSize) + public void onBatchStart(final long batchSize, final long queueDepth) { currentSequences = new ArrayList<>(); announcedBatchSizes.add(batchSize); + announcedQueueDepths.add(queueDepth); } } }