diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java index fd0a704e652f..9e9c07717d96 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java @@ -1319,7 +1319,8 @@ private void replayPreconsensusEvents() { consensusRoundHandler, stateHashSignQueue, stateManagementComponent, - initialMinimumGenerationNonAncient); + initialMinimumGenerationNonAncient, + platformWiring::flushIntakePipeline); } consensusHashManager.signalEndOfPreconsensusReplay(); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventReplayWorkflow.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventReplayWorkflow.java index cefdea8d512b..67d95ff4aa7f 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventReplayWorkflow.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/PreconsensusEventReplayWorkflow.java @@ -22,6 +22,7 @@ import static com.swirlds.logging.legacy.LogMarker.STARTUP; import com.swirlds.base.time.Time; +import com.swirlds.common.config.EventConfig; import com.swirlds.common.context.PlatformContext; import com.swirlds.common.formatting.UnitFormatter; import com.swirlds.common.io.IOIterator; @@ -61,6 +62,8 @@ private PreconsensusEventReplayWorkflow() {} * @param stateHashSignQueue the queue thread for hashing and signing states * @param stateManagementComponent manages various copies of the state * @param initialMinimumGenerationNonAncient the minimum generation of events to replay + * @param flushIntakePipeline flushes the intake pipeline. only used if the new intake pipeline is + * enabled */ public static void replayPreconsensusEvents( @NonNull final PlatformContext platformContext, @@ -73,7 +76,8 @@ public static void replayPreconsensusEvents( @NonNull final ConsensusRoundHandler consensusRoundHandler, @NonNull final QueueThread stateHashSignQueue, @NonNull final StateManagementComponent stateManagementComponent, - final long initialMinimumGenerationNonAncient) { + final long initialMinimumGenerationNonAncient, + @NonNull Runnable flushIntakePipeline) { Objects.requireNonNull(platformContext); Objects.requireNonNull(threadManager); @@ -101,7 +105,13 @@ public static void replayPreconsensusEvents( new PreconsensusEventReplayPipeline(platformContext, threadManager, iterator, intakeHandler); eventReplayPipeline.replayEvents(); - waitForReplayToComplete(intakeQueue, consensusRoundHandler, stateHashSignQueue); + final boolean useLegacyIntake = platformContext + .getConfiguration() + .getConfigData(EventConfig.class) + .useLegacyIntake(); + + waitForReplayToComplete( + intakeQueue, consensusRoundHandler, stateHashSignQueue, useLegacyIntake, flushIntakePipeline); final Instant finish = time.now(); final Duration elapsed = Duration.between(start, finish); @@ -127,12 +137,21 @@ public static void replayPreconsensusEvents( private static void waitForReplayToComplete( @NonNull final QueueThread intakeQueue, @NonNull final ConsensusRoundHandler consensusRoundHandler, - @NonNull final QueueThread stateHashSignQueue) + @NonNull final QueueThread stateHashSignQueue, + final boolean useLegacyIntake, + @NonNull final Runnable flushIntakePipeline) throws InterruptedException { // Wait until all events from the preconsensus event stream have been fully ingested. intakeQueue.waitUntilNotBusy(); + if (!useLegacyIntake) { + // The old intake has an empty intake pipeline as soon as the intake queue is empty. + // The new intake has more steps to the intake pipeline, so we need to flush it before certifying that + // the replay is complete. + flushIntakePipeline.run(); + } + // Wait until all rounds from the preconsensus event stream have been fully processed. consensusRoundHandler.waitUntilNotBusy(); diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformCoordinator.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformCoordinator.java index 2ac48132fbb7..1a67eeb787b1 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformCoordinator.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformCoordinator.java @@ -61,6 +61,19 @@ public PlatformCoordinator( this.eventCreationManagerWiring = Objects.requireNonNull(eventCreationManagerWiring); } + /** + * Flushes the intake pipeline + */ + public void flushIntakePipeline() { + internalEventValidatorWiring.flushRunnable().run(); + eventDeduplicatorWiring.flushRunnable().run(); + eventSignatureValidatorWiring.flushRunnable().run(); + orphanBufferWiring.flushRunnable().run(); + eventCreationManagerWiring.flush(); + inOrderLinkerWiring.flushRunnable().run(); + linkedEventIntakeWiring.flushRunnable().run(); + } + /** * Safely clears the intake pipeline *

@@ -77,13 +90,7 @@ public void clear() { // Phase 2: flush // Flush everything remaining in the intake pipeline out into the void. - internalEventValidatorWiring.flushRunnable().run(); - eventDeduplicatorWiring.flushRunnable().run(); - eventSignatureValidatorWiring.flushRunnable().run(); - orphanBufferWiring.flushRunnable().run(); - eventCreationManagerWiring.flush(); - inOrderLinkerWiring.flushRunnable().run(); - linkedEventIntakeWiring.flushRunnable().run(); + flushIntakePipeline(); // Phase 3: clear // Data is no longer moving through the system. clear all the internal data structures in the wiring objects. diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java index 825b2cf64af1..1898cabd83f2 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformWiring.java @@ -296,6 +296,13 @@ public void updateMinimumGenerationNonAncient(final long minimumGenerationNonAnc eventCreationManagerWiring.minimumGenerationNonAncientInput().inject(minimumGenerationNonAncient); } + /** + * Flush the intake pipeline. + */ + public void flushIntakePipeline() { + platformCoordinator.flushIntakePipeline(); + } + /** * {@inheritDoc} */ diff --git a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/preconsensus/PreconsensusEventReplayWorkflowTests.java b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/preconsensus/PreconsensusEventReplayWorkflowTests.java index 7a3bf0a521b5..76165edc61c6 100644 --- a/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/preconsensus/PreconsensusEventReplayWorkflowTests.java +++ b/platform-sdk/swirlds-unit-tests/core/swirlds-platform-test/src/test/java/com/swirlds/platform/test/event/preconsensus/PreconsensusEventReplayWorkflowTests.java @@ -175,7 +175,8 @@ void testBasicReplayWorkflow() throws InterruptedException { consensusRoundHandler, stateHashSignQueue, stateManagementComponent, - minimumGenerationNonAncient); + minimumGenerationNonAncient, + () -> {}); assertEquals(TestPhase.TEST_FINISHED, phase.get()); }