Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 10116 intake clearing #10306

Merged
merged 13 commits into from
Dec 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -839,15 +839,27 @@

final Clearable pauseEventCreation = eventCreator::pauseEventCreation;

clearAllPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(pauseEventCreation, "eventCreator"),
Pair.of(gossip, "gossip"),
Pair.of(preConsensusEventHandler, "preConsensusEventHandler"),
Pair.of(consensusRoundHandler, "consensusRoundHandler"),
Pair.of(transactionPool, "transactionPool"),
Pair.of(platformWiring, "platformWiring")));
if (eventConfig.useLegacyIntake()) {
clearAllPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(pauseEventCreation, "eventCreator"),
Pair.of(gossip, "gossip"),
Pair.of(preConsensusEventHandler, "preConsensusEventHandler"),
Pair.of(consensusRoundHandler, "consensusRoundHandler"),
Pair.of(transactionPool, "transactionPool")));

Check warning on line 850 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L843-L850

Added lines #L843 - L850 were not covered by tests
} else {
clearAllPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(pauseEventCreation, "eventCreator"),
Pair.of(intakeQueue, "intakeQueue"),
Pair.of(platformWiring, "platformWiring"),
Pair.of(shadowGraph, "shadowGraph"),
Pair.of(preConsensusEventHandler, "preConsensusEventHandler"),
Pair.of(consensusRoundHandler, "consensusRoundHandler"),
Pair.of(transactionPool, "transactionPool")));

Check warning on line 861 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java#L852-L861

Added lines #L852 - L861 were not covered by tests
}

if (platformContext.getConfiguration().getConfigData(ThreadConfig.class).jvmAnchor()) {
components.add(new JvmAnchor(threadManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
*/
private final IntakeEventCounter intakeEventCounter;

/**
* Whether or not the linked event intake is paused.
* <p>
* When paused, all received events will be tossed into the void
*/
private boolean paused;

/**
* Constructor
*
Expand Down Expand Up @@ -103,6 +110,8 @@
this.prehandleEvent = Objects.requireNonNull(prehandleEvent);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);

this.paused = false;

Check warning on line 113 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java#L113

Added line #L113 was not covered by tests

final EventConfig eventConfig = platformContext.getConfiguration().getConfigData(EventConfig.class);

final BlockingQueue<Runnable> prehandlePoolQueue = new LinkedBlockingQueue<>();
Expand All @@ -128,9 +137,16 @@
public List<ConsensusRound> addEvent(@NonNull final EventImpl event) {
Objects.requireNonNull(event);

if (paused) {
// If paused, throw everything into the void
event.clear();
return List.of();

Check warning on line 143 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}

try {
if (event.getGeneration() < consensusSupplier.get().getMinGenerationNonAncient()) {
// ancient events *may* be discarded, and stale events *must* be discarded
event.clear();

Check warning on line 149 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java#L149

Added line #L149 was not covered by tests
return List.of();
}

Expand Down Expand Up @@ -162,6 +178,15 @@
}
}

/**
* Pause or unpause this object.
*
* @param paused whether or not this object should be paused
*/
public void setPaused(final boolean paused) {
this.paused = paused;
}

Check warning on line 188 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/components/LinkedEventIntake.java#L187-L188

Added lines #L187 - L188 were not covered by tests

/**
* Build a task that will prehandle transactions in an event. Executed on a thread pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.IntakeEventCounter;
import com.swirlds.platform.metrics.EventIntakeMetrics;
import com.swirlds.platform.wiring.ClearTrigger;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -145,4 +146,13 @@

observedEvents.shiftWindow(minimumGenerationNonAncient);
}

/**
* Clear the internal state of this deduplicator.
*
* @param ignored ignored trigger object
*/
public void clear(@NonNull final ClearTrigger ignored) {
observedEvents.clear();
}

Check warning on line 157 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/deduplication/EventDeduplicator.java#L156-L157

Added lines #L156 - L157 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.IntakeEventCounter;
import com.swirlds.platform.internal.EventImpl;
import com.swirlds.platform.wiring.ClearTrigger;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Duration;
Expand Down Expand Up @@ -251,4 +252,14 @@
parentDescriptorMap.shiftWindow(
minimumGenerationNonAncient, (descriptor, event) -> parentHashMap.remove(descriptor.getHash()));
}

/**
* Clear the internal state of this linker.
*
* @param ignored ignored trigger object
*/
public void clear(@NonNull final ClearTrigger ignored) {
parentDescriptorMap.clear();
parentHashMap.clear();
}

Check warning on line 264 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/linking/InOrderLinker.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/linking/InOrderLinker.java#L262-L264

Added lines #L262 - L264 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.swirlds.common.system.events.EventDescriptor;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.gossip.IntakeEventCounter;
import com.swirlds.platform.wiring.ClearTrigger;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.Deque;
Expand Down Expand Up @@ -80,15 +81,6 @@
private final SequenceMap<EventDescriptor, List<OrphanedEvent>> missingParentMap =
new StandardSequenceMap<>(0, INITIAL_CAPACITY, true, EventDescriptor::getGeneration);

/**
* Whether or not the orphan buffer is paused.
* <p>
* When the orphan buffer is paused, it will not process any updates to the minimum generation non-ancient. This
* guarantees that the orphan buffer cannot cyclically cause itself to receive additional input by emitting events
* that cause the minimum generation non-ancient to be updated, thus causing more events to be emitted.
*/
private boolean paused;

/**
* Constructor
*
Expand All @@ -107,8 +99,6 @@
PLATFORM_CATEGORY, "orphanBufferSize", Integer.class, this::getCurrentOrphanCount)
.withDescription("number of orphaned events currently in the orphan buffer")
.withUnit("events"));

this.paused = false;
}

/**
Expand Down Expand Up @@ -151,11 +141,6 @@
*/
@NonNull
public List<GossipEvent> setMinimumGenerationNonAncient(final long minimumGenerationNonAncient) {
if (paused) {
// If the orphan buffer is paused, don't process any updates to the minimum generation non-ancient.
return List.of();
}

this.minimumGenerationNonAncient = minimumGenerationNonAncient;

eventsWithParents.shiftWindow(minimumGenerationNonAncient);
Expand All @@ -175,18 +160,6 @@
return unorphanedEvents;
}

/**
* Pause or unpause the orphan buffer.
* <p>
* The orphan buffer must be paused prior to being flushed, since its output can cyclically cause it to receive
* additional input, via an update to minimum generation non ancient.
*
* @param paused whether or not the orphan buffer should be paused
*/
public void setPaused(final boolean paused) {
this.paused = paused;
}

/**
* Called when a parent becomes ancient.
* <p>
Expand Down Expand Up @@ -293,4 +266,18 @@
public Integer getCurrentOrphanCount() {
return currentOrphanCount;
}

/**
* Clears the orphan buffer.
*
* @param ignored ignored trigger object
*/
public void clear(@NonNull final ClearTrigger ignored) {
eventsWithParents.clear();

Check warning on line 276 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/OrphanBuffer.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/OrphanBuffer.java#L276

Added line #L276 was not covered by tests

// clearing this map here is safe, under the assumption that the intake event counter will be reset
// before gossip starts back up
missingParentMap.clear();
currentOrphanCount = 0;
}

Check warning on line 282 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/OrphanBuffer.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/orphan/OrphanBuffer.java#L280-L282

Added lines #L280 - L282 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -702,15 +702,15 @@
final boolean knownOP = shadow(e.getOtherParent()) != null;
final boolean expiredOP = expired(e.getOtherParent());
if (!knownOP && !expiredOP) {
logger.error(EXCEPTION.getMarker(), "Missing non-expired other parent for {}", e::toMediumString);
logger.warn(STARTUP.getMarker(), "Missing non-expired other parent for {}", e::toMediumString);

Check warning on line 705 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/shadowgraph/ShadowGraph.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/shadowgraph/ShadowGraph.java#L705

Added line #L705 was not covered by tests
}
}

if (hasSP) {
final boolean knownSP = shadow(e.getSelfParent()) != null;
final boolean expiredSP = expired(e.getSelfParent());
if (!knownSP && !expiredSP) {
logger.error(EXCEPTION.getMarker(), "Missing non-expired self parent for {}", e::toMediumString);
logger.warn(STARTUP.getMarker(), "Missing non-expired self parent for {}", e::toMediumString);

Check warning on line 713 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/shadowgraph/ShadowGraph.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/shadowgraph/ShadowGraph.java#L713

Added line #L713 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.swirlds.base.time.Time;
import com.swirlds.base.utility.Pair;
import com.swirlds.common.config.EventConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.merkle.synchronization.config.ReconnectConfig;
import com.swirlds.common.system.NodeId;
Expand Down Expand Up @@ -109,9 +110,15 @@
loadReconnectState,
clearAllPipelinesForReconnect);

clearAllInternalPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(Pair.of(intakeQueue, "intakeQueue"), Pair.of(shadowGraph, "shadowGraph")));
if (platformContext.getConfiguration().getConfigData(EventConfig.class).useLegacyIntake()) {
// legacy intake clears these things as part of gossip
clearAllInternalPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(Pair.of(intakeQueue, "intakeQueue"), Pair.of(shadowGraph, "shadowGraph")));

Check warning on line 117 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java#L115-L117

Added lines #L115 - L117 were not covered by tests
} else {
// the new intake pipeline clears everything from the top level, rather than delegating to gossip
clearAllInternalPipelines = null;

Check warning on line 120 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java#L120

Added line #L120 was not covered by tests
}
}

/**
Expand Down Expand Up @@ -151,7 +158,9 @@
*/
@Override
public void clear() {
clearAllInternalPipelines.clear();
if (clearAllInternalPipelines != null) {
clearAllInternalPipelines.clear();

Check warning on line 162 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SingleNodeSyncGossip.java#L162

Added line #L162 was not covered by tests
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,18 @@
false,
() -> {});

clearAllInternalPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(intakeQueue, "intakeQueue"),
Pair.of(new PauseAndClear(intakeQueue, eventLinker), "eventLinker"),
Pair.of(shadowGraph, "shadowGraph")));
if (eventConfig.useLegacyIntake()) {
// legacy intake clears these things as part of gossip
clearAllInternalPipelines = new LoggingClearables(
RECONNECT.getMarker(),
List.of(
Pair.of(intakeQueue, "intakeQueue"),
Pair.of(new PauseAndClear(intakeQueue, eventLinker), "eventLinker"),
Pair.of(shadowGraph, "shadowGraph")));

Check warning on line 207 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java#L202-L207

Added lines #L202 - L207 were not covered by tests
} else {
// the new intake pipeline clears everything from the top level, rather than delegating to gossip
clearAllInternalPipelines = null;

Check warning on line 210 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java#L210

Added line #L210 was not covered by tests
}

final ReconnectConfig reconnectConfig =
platformContext.getConfiguration().getConfigData(ReconnectConfig.class);
Expand Down Expand Up @@ -358,7 +364,10 @@
*/
@Override
public void clear() {
clearAllInternalPipelines.clear();
// this will be null if the new intake pipeline is being used
if (clearAllInternalPipelines != null) {
clearAllInternalPipelines.clear();

Check warning on line 369 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/gossip/sync/SyncGossip.java#L369

Added line #L369 was not covered by tests
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.wiring;

/**
* A placeholder object, to provide a type to the input `clear` wire for wiring components.
*/
public record ClearTrigger() {}

Check warning on line 22 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/ClearTrigger.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/ClearTrigger.java#L22

Added line #L22 was not covered by tests
edward-swirldslabs marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
*
* @param eventInput the input wire for events to be deduplicated
* @param minimumGenerationNonAncientInput the input wire for the minimum generation non-ancient
* @param clearInput the input wire to clear the internal state of the deduplicator
* @param eventOutput the output wire for deduplicated events
* @param flushRunnable the runnable to flush the deduplicator
*/
public record EventDeduplicatorWiring(
@NonNull InputWire<GossipEvent> eventInput,
@NonNull InputWire<Long> minimumGenerationNonAncientInput,
@NonNull InputWire<ClearTrigger> clearInput,
@NonNull OutputWire<GossipEvent> eventOutput,
@NonNull Runnable flushRunnable) {

Expand All @@ -48,6 +50,7 @@ public static EventDeduplicatorWiring create(@NonNull final TaskScheduler<Gossip
return new EventDeduplicatorWiring(
taskScheduler.buildInputWire("non-deduplicated events"),
taskScheduler.buildInputWire("minimum generation non ancient"),
taskScheduler.buildInputWire("clear"),
taskScheduler.getOutputWire(),
taskScheduler::flush);
}
Expand All @@ -61,5 +64,6 @@ public void bind(@NonNull final EventDeduplicator deduplicator) {
((BindableInputWire<GossipEvent, GossipEvent>) eventInput).bind(deduplicator::handleEvent);
((BindableInputWire<Long, GossipEvent>) minimumGenerationNonAncientInput)
.bind(deduplicator::setMinimumGenerationNonAncient);
((BindableInputWire<ClearTrigger, GossipEvent>) clearInput).bind(deduplicator::clear);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
*
* @param eventInput the input wire for events to be linked
* @param minimumGenerationNonAncientInput the input wire for the minimum generation non-ancient
* @param clearInput the input wire to clear the internal state of the linker
* @param eventOutput the output wire for linked events
* @param flushRunnable the runnable to flush the linker
*/
public record InOrderLinkerWiring(
@NonNull InputWire<GossipEvent> eventInput,
@NonNull InputWire<Long> minimumGenerationNonAncientInput,
@NonNull InputWire<ClearTrigger> clearInput,
@NonNull OutputWire<EventImpl> eventOutput,
@NonNull Runnable flushRunnable) {

Expand All @@ -49,6 +51,7 @@ public static InOrderLinkerWiring create(@NonNull final TaskScheduler<EventImpl>
return new InOrderLinkerWiring(
taskScheduler.buildInputWire("unlinked events"),
taskScheduler.buildInputWire("minimum generation non ancient"),
taskScheduler.buildInputWire("clear"),
taskScheduler.getOutputWire(),
taskScheduler::flush);
}
Expand All @@ -62,5 +65,6 @@ public void bind(@NonNull final InOrderLinker inOrderLinker) {
((BindableInputWire<GossipEvent, EventImpl>) eventInput).bind(inOrderLinker::linkEvent);
((BindableInputWire<Long, EventImpl>) minimumGenerationNonAncientInput)
.bind(inOrderLinker::setMinimumGenerationNonAncient);
((BindableInputWire<ClearTrigger, EventImpl>) clearInput).bind(inOrderLinker::clear);
}
}
Loading
Loading