Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: create latest immutable state nexus #10291

Merged
merged 15 commits into from
Dec 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* the type of the wrapped object
*/
public class AutoCloseableWrapper<T> implements AutoCloseable {

private static final AutoCloseableWrapper<?> EMPTY = new AutoCloseableWrapper<>(null, () -> {});
private final T object;
private final Runnable closeCallback;

Expand Down Expand Up @@ -56,4 +56,14 @@ public T get() {
public void close() {
closeCallback.run();
}

/**
* Get an empty wrapper.
*
* @return an empty wrapper
*/
@SuppressWarnings("unchecked")
public static <T> AutoCloseableWrapper<T> empty() {
return (AutoCloseableWrapper<T>) EMPTY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.RECONNECT;
import static com.swirlds.logging.legacy.LogMarker.STARTUP;
import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;
import static com.swirlds.platform.event.creation.EventCreationManagerFactory.buildEventCreationManager;
import static com.swirlds.platform.event.creation.EventCreationManagerFactory.buildLegacyEventCreationManager;
import static com.swirlds.platform.state.address.AddressBookMetrics.registerAddressBookMetrics;
Expand Down Expand Up @@ -153,6 +154,7 @@
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.state.nexus.EmergencyStateNexus;
import com.swirlds.platform.state.nexus.LatestCompleteStateNexus;
import com.swirlds.platform.state.nexus.SignedStateNexus;
import com.swirlds.platform.state.signed.ReservedSignedState;
import com.swirlds.platform.state.signed.SavedStateInfo;
import com.swirlds.platform.state.signed.SignedState;
Expand All @@ -161,6 +163,7 @@
import com.swirlds.platform.state.signed.SignedStateMetrics;
import com.swirlds.platform.state.signed.SourceOfSignedState;
import com.swirlds.platform.state.signed.StartupStateUtils;
import com.swirlds.platform.state.signed.StateDumpRequest;
import com.swirlds.platform.state.signed.StateToDiskReason;
import com.swirlds.platform.stats.StatConstructor;
import com.swirlds.platform.system.InitTrigger;
Expand Down Expand Up @@ -239,6 +242,18 @@ public class SwirldsPlatform implements Platform {
private final long initialMinimumGenerationNonAncient;

private final StateManagementComponent stateManagementComponent;
/** Manages the pipeline of signed states to be written to disk */
private final SignedStateFileManager signedStateFileManager;

/**
* Holds the latest state that is immutable. May be unhashed (in the future), may or may not have all required
* signatures. State is returned with a reservation.
* <p>
* NOTE: This is currently set when a state has finished hashing. In the future, this will be set at the moment a
* new state is created, before it is hashed.
*/
private final SignedStateNexus latestImmutableState = new SignedStateNexus();

private final QueueThread<GossipEvent> intakeQueue;
private final QueueThread<ReservedSignedState> stateHashSignQueue;
private final EventLinker eventLinker;
Expand Down Expand Up @@ -494,8 +509,7 @@ public class SwirldsPlatform implements Platform {

components.add(new IssMetrics(platformContext.getMetrics(), currentAddressBook));

// Manages the pipeline of signed states to be written to disk
final SignedStateFileManager signedStateFileManager = new SignedStateFileManager(
signedStateFileManager = new SignedStateFileManager(
platformContext,
new SignedStateMetrics(platformContext.getMetrics()),
Time.getCurrent(),
Expand Down Expand Up @@ -535,7 +549,6 @@ public class SwirldsPlatform implements Platform {
newLatestCompleteStateConsumer,
this::handleFatalError,
savedStateController,
platformWiring.getDumpStateToDiskInput()::put,
platformWiring.getSignStateInput()::put);

// Load the minimum generation into the pre-consensus event writer
Expand Down Expand Up @@ -610,6 +623,7 @@ public class SwirldsPlatform implements Platform {
appVersion);

final InterruptableConsumer<ReservedSignedState> newSignedStateFromTransactionsConsumer = rs -> {
latestImmutableState.setState(rs.getAndReserve("newSignedStateFromTransactionsConsumer"));
latestCompleteState.newIncompleteState(rs.get().getRound());
stateManagementComponent.newSignedStateFromTransactions(rs);
};
Expand Down Expand Up @@ -842,7 +856,7 @@ public class SwirldsPlatform implements Platform {
} else {
initialMinimumGenerationNonAncient =
initialState.getState().getPlatformState().getPlatformData().getMinimumGenerationNonAncient();

latestImmutableState.setState(initialState.reserve("set latest immutable to initial state"));
stateManagementComponent.stateToLoad(initialState, SourceOfSignedState.DISK);
consensusRoundHandler.loadDataFromSignedState(initialState, false);

Expand Down Expand Up @@ -1095,6 +1109,7 @@ private void loadReconnectState(final SignedState signedState) {
// kick off transition to RECONNECT_COMPLETE before beginning to save the reconnect state to disk
// this guarantees that the platform status will be RECONNECT_COMPLETE before the state is saved
platformStatusManager.submitStatusAction(new ReconnectCompleteAction(signedState.getRound()));
latestImmutableState.setState(signedState.reserve("set latest immutable to reconnect state"));
stateManagementComponent.stateToLoad(signedState, SourceOfSignedState.RECONNECT);

loadStateIntoConsensus(signedState);
Expand Down Expand Up @@ -1277,7 +1292,22 @@ public void performPcesRecovery() {
eventCreator.start();
}
replayPreconsensusEvents();
stateManagementComponent.dumpLatestImmutableState(StateToDiskReason.PCES_RECOVERY_COMPLETE, true);
try (final ReservedSignedState reservedState = latestImmutableState.getState("Get PCES recovery state")) {
if (reservedState == null) {
logger.warn(
STATE_TO_DISK.getMarker(),
"Trying to dump PCES recovery state to disk, but no state is available.");
} else {
final SignedState signedState = reservedState.get();
signedState.markAsStateToSave(StateToDiskReason.PCES_RECOVERY_COMPLETE);

final StateDumpRequest request =
StateDumpRequest.create(signedState.reserve("dumping PCES recovery state"));

signedStateFileManager.dumpStateTask(request);
request.waitForFinished().run();
}
}
}

/**
Expand Down Expand Up @@ -1305,8 +1335,8 @@ private void replayPreconsensusEvents() {
intakeQueue,
consensusRoundHandler,
stateHashSignQueue,
stateManagementComponent,
initialMinimumGenerationNonAncient,
() -> latestImmutableState.getState("PCES replay"),
platformWiring::flushIntakePipeline);
}

Expand Down Expand Up @@ -1363,10 +1393,12 @@ public AddressBook getAddressBook() {
*/
@SuppressWarnings("unchecked")
@Override
public <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(@NonNull final String reason) {
final ReservedSignedState wrapper = stateManagementComponent.getLatestImmutableState(reason);
return new AutoCloseableWrapper<>(
wrapper.isNull() ? null : (T) wrapper.get().getState().getSwirldState(), wrapper::close);
public @NonNull <T extends SwirldState> AutoCloseableWrapper<T> getLatestImmutableState(
@NonNull final String reason) {
final ReservedSignedState wrapper = latestImmutableState.getState(reason);
return wrapper == null
? AutoCloseableWrapper.empty()
: new AutoCloseableWrapper<>((T) wrapper.get().getState().getSwirldState(), wrapper::close);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.swirlds.platform.components.state;

import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;

import com.swirlds.base.time.Time;
import com.swirlds.common.config.StateConfig;
import com.swirlds.common.context.PlatformContext;
Expand All @@ -37,26 +34,17 @@
import com.swirlds.platform.state.signed.SignedStateMetrics;
import com.swirlds.platform.state.signed.SignedStateSentinel;
import com.swirlds.platform.state.signed.SourceOfSignedState;
import com.swirlds.platform.state.signed.StateDumpRequest;
import com.swirlds.platform.state.signed.StateToDiskReason;
import com.swirlds.platform.util.HashLogger;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* The default implementation of {@link StateManagementComponent}.
*/
public class DefaultStateManagementComponent implements StateManagementComponent {

private static final Logger logger = LogManager.getLogger(DefaultStateManagementComponent.class);

/**
* Signed states are deleted on this background thread.
*/
Expand All @@ -83,7 +71,7 @@ public class DefaultStateManagementComponent implements StateManagementComponent
private final SignedStateSentinel signedStateSentinel;

private final SavedStateController savedStateController;
private final Consumer<StateDumpRequest> stateDumpConsumer;

private final Consumer<ReservedSignedState> stateSigner;

/**
Expand All @@ -94,7 +82,6 @@ public class DefaultStateManagementComponent implements StateManagementComponent
* @param newLatestCompleteStateConsumer consumer to invoke when there is a new latest complete signed state
* @param fatalErrorConsumer consumer to invoke when a fatal error has occurred
* @param savedStateController controls which states are saved to disk
* @param stateDumpConsumer consumer to invoke when a state is requested to be dumped to disk
*/
public DefaultStateManagementComponent(
@NonNull final PlatformContext platformContext,
Expand All @@ -103,7 +90,6 @@ public DefaultStateManagementComponent(
@NonNull final NewLatestCompleteStateConsumer newLatestCompleteStateConsumer,
@NonNull final FatalErrorConsumer fatalErrorConsumer,
@NonNull final SavedStateController savedStateController,
@NonNull final Consumer<StateDumpRequest> stateDumpConsumer,
@NonNull final Consumer<ReservedSignedState> stateSigner) {

Objects.requireNonNull(platformContext);
Expand All @@ -116,7 +102,6 @@ public DefaultStateManagementComponent(
this.signedStateGarbageCollector = new SignedStateGarbageCollector(threadManager, signedStateMetrics);
this.signedStateSentinel = new SignedStateSentinel(platformContext, threadManager, Time.getCurrent());
this.savedStateController = Objects.requireNonNull(savedStateController);
this.stateDumpConsumer = Objects.requireNonNull(stateDumpConsumer);
this.stateSigner = Objects.requireNonNull(stateSigner);

hashLogger =
Expand Down Expand Up @@ -167,36 +152,10 @@ private void newSignedStateBeingTracked(final SignedState signedState, final Sou
}
}

/**
* Checks if the signed state's round is older than the round of the latest state in the signed state manager.
*
* @param signedState the signed state whose round needs to be compared to the latest state in the signed state
* manager.
* @return true if the signed state's round is < the round of the latest state in the signed state manager,
* otherwise false.
*/
private boolean stateRoundIsTooOld(final SignedState signedState) {
final long roundOfLatestState = signedStateManager.getLastImmutableStateRound();
if (signedState.getRound() < roundOfLatestState) {
logger.error(
EXCEPTION.getMarker(),
"State received from transactions is in an incorrect order. "
+ "Latest state is from round {}, provided state is from round {}",
roundOfLatestState,
signedState.getRound());
return true;
}
return false;
}

@Override
public void newSignedStateFromTransactions(@NonNull final ReservedSignedState signedState) {
try (signedState) {
signedState.get().setGarbageCollector(signedStateGarbageCollector);

if (stateRoundIsTooOld(signedState.get())) {
return; // do not process older states.
}
signedStateHasher.hashState(signedState.get());

newSignedStateBeingTracked(signedState.get(), SourceOfSignedState.TRANSACTIONS);
Expand All @@ -207,14 +166,6 @@ public void newSignedStateFromTransactions(@NonNull final ReservedSignedState si
}
}

/**
* {@inheritDoc}
*/
@Override
public ReservedSignedState getLatestImmutableState(@NonNull final String reason) {
return signedStateManager.getLatestImmutableState(reason);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -251,76 +202,6 @@ public void stop() {
signedStateGarbageCollector.stop();
}

/**
* {@inheritDoc}
*/
@NonNull
@Override
public ReservedSignedState find(final @NonNull Predicate<SignedState> criteria, @NonNull final String reason) {
return signedStateManager.find(criteria, reason);
}

@Override
public void dumpLatestImmutableState(@NonNull final StateToDiskReason reason, final boolean blocking) {
Objects.requireNonNull(reason);

try (final ReservedSignedState reservedState = signedStateManager.getLatestImmutableState(
"DefaultStateManagementComponent.dumpLatestImmutableState()")) {

if (reservedState.isNull()) {
logger.warn(STATE_TO_DISK.getMarker(), "State dump requested, but no state is available.");
} else {
dumpState(reservedState.get(), reason, blocking);
}
}
}

/**
* Dump a state to disk out-of-band.
* <p>
* Writing a state "out-of-band" means the state is being written for the sake of a human, whether for debug
* purposes, or because of a fault. States written out-of-band will not be read automatically by the platform, and
* will not be used as an initial state at boot time.
* <p>
* A dumped state will be saved in a subdirectory of the signed states base directory, with the subdirectory being
* named after the reason the state is being written out-of-band.
*
* @param signedState the signed state to write to disk
* @param reason the reason why the state is being written out-of-band
* @param blocking if true then block until the state has been fully written to disk
*/
private void dumpState(
@NonNull final SignedState signedState, @NonNull final StateToDiskReason reason, final boolean blocking) {
Objects.requireNonNull(signedState);
Objects.requireNonNull(reason);
signedState.markAsStateToSave(reason);

final StateDumpRequest request = StateDumpRequest.create(signedState.reserve("dumping to disk"));

stateDumpConsumer.accept(request);

if (blocking) {
request.waitForFinished().run();
}
}

/**
* {@inheritDoc}
*/
@Override
@Nullable
public Instant getFirstStateTimestamp() {
return signedStateManager.getFirstStateTimestamp();
}

/**
* {@inheritDoc}
*/
@Override
public long getFirstStateRound() {
return signedStateManager.getFirstStateRound();
}

/**
* {@inheritDoc}
*/
Expand Down
Loading
Loading