Skip to content

Commit

Permalink
fix: Flush intake prior to finishing replay (#10452)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
  • Loading branch information
litt3 authored Dec 12, 2023
1 parent cf6e9e0 commit 6c00199
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ private void replayPreconsensusEvents() {
consensusRoundHandler,
stateHashSignQueue,
stateManagementComponent,
initialMinimumGenerationNonAncient);
initialMinimumGenerationNonAncient,
platformWiring::flushIntakePipeline);
}

consensusHashManager.signalEndOfPreconsensusReplay();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.swirlds.logging.legacy.LogMarker.STARTUP;

import com.swirlds.base.time.Time;
import com.swirlds.common.config.EventConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.formatting.UnitFormatter;
import com.swirlds.common.io.IOIterator;
Expand Down Expand Up @@ -61,6 +62,8 @@ private PreconsensusEventReplayWorkflow() {}
* @param stateHashSignQueue the queue thread for hashing and signing states
* @param stateManagementComponent manages various copies of the state
* @param initialMinimumGenerationNonAncient the minimum generation of events to replay
* @param flushIntakePipeline flushes the intake pipeline. only used if the new intake pipeline is
* enabled
*/
public static void replayPreconsensusEvents(
@NonNull final PlatformContext platformContext,
Expand All @@ -73,7 +76,8 @@ public static void replayPreconsensusEvents(
@NonNull final ConsensusRoundHandler consensusRoundHandler,
@NonNull final QueueThread<ReservedSignedState> stateHashSignQueue,
@NonNull final StateManagementComponent stateManagementComponent,
final long initialMinimumGenerationNonAncient) {
final long initialMinimumGenerationNonAncient,
@NonNull Runnable flushIntakePipeline) {

Objects.requireNonNull(platformContext);
Objects.requireNonNull(threadManager);
Expand Down Expand Up @@ -101,7 +105,13 @@ public static void replayPreconsensusEvents(
new PreconsensusEventReplayPipeline(platformContext, threadManager, iterator, intakeHandler);
eventReplayPipeline.replayEvents();

waitForReplayToComplete(intakeQueue, consensusRoundHandler, stateHashSignQueue);
final boolean useLegacyIntake = platformContext
.getConfiguration()
.getConfigData(EventConfig.class)
.useLegacyIntake();

waitForReplayToComplete(
intakeQueue, consensusRoundHandler, stateHashSignQueue, useLegacyIntake, flushIntakePipeline);

final Instant finish = time.now();
final Duration elapsed = Duration.between(start, finish);
Expand All @@ -127,12 +137,21 @@ public static void replayPreconsensusEvents(
private static void waitForReplayToComplete(
@NonNull final QueueThread<GossipEvent> intakeQueue,
@NonNull final ConsensusRoundHandler consensusRoundHandler,
@NonNull final QueueThread<ReservedSignedState> stateHashSignQueue)
@NonNull final QueueThread<ReservedSignedState> stateHashSignQueue,
final boolean useLegacyIntake,
@NonNull final Runnable flushIntakePipeline)
throws InterruptedException {

// Wait until all events from the preconsensus event stream have been fully ingested.
intakeQueue.waitUntilNotBusy();

if (!useLegacyIntake) {
// The old intake has an empty intake pipeline as soon as the intake queue is empty.
// The new intake has more steps to the intake pipeline, so we need to flush it before certifying that
// the replay is complete.
flushIntakePipeline.run();
}

// Wait until all rounds from the preconsensus event stream have been fully processed.
consensusRoundHandler.waitUntilNotBusy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ public PlatformCoordinator(
this.eventCreationManagerWiring = Objects.requireNonNull(eventCreationManagerWiring);
}

/**
* Flushes the intake pipeline
*/
public void flushIntakePipeline() {
internalEventValidatorWiring.flushRunnable().run();
eventDeduplicatorWiring.flushRunnable().run();
eventSignatureValidatorWiring.flushRunnable().run();
orphanBufferWiring.flushRunnable().run();
eventCreationManagerWiring.flush();
inOrderLinkerWiring.flushRunnable().run();
linkedEventIntakeWiring.flushRunnable().run();
}

/**
* Safely clears the intake pipeline
* <p>
Expand All @@ -77,13 +90,7 @@ public void clear() {

// Phase 2: flush
// Flush everything remaining in the intake pipeline out into the void.
internalEventValidatorWiring.flushRunnable().run();
eventDeduplicatorWiring.flushRunnable().run();
eventSignatureValidatorWiring.flushRunnable().run();
orphanBufferWiring.flushRunnable().run();
eventCreationManagerWiring.flush();
inOrderLinkerWiring.flushRunnable().run();
linkedEventIntakeWiring.flushRunnable().run();
flushIntakePipeline();

// Phase 3: clear
// Data is no longer moving through the system. clear all the internal data structures in the wiring objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ public void updateMinimumGenerationNonAncient(final long minimumGenerationNonAnc
eventCreationManagerWiring.minimumGenerationNonAncientInput().inject(minimumGenerationNonAncient);
}

/**
* Flush the intake pipeline.
*/
public void flushIntakePipeline() {
platformCoordinator.flushIntakePipeline();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ void testBasicReplayWorkflow() throws InterruptedException {
consensusRoundHandler,
stateHashSignQueue,
stateManagementComponent,
minimumGenerationNonAncient);
minimumGenerationNonAncient,
() -> {});

assertEquals(TestPhase.TEST_FINISHED, phase.get());
}
Expand Down

0 comments on commit 6c00199

Please sign in to comment.