Skip to content

Commit

Permalink
Swap to creating a new listener and directly dispatching the ISS noti…
Browse files Browse the repository at this point in the history
…fication to it in DefaultAppNotifier

Signed-off-by: Tim Farber-Newman <tim.farber-newman@swirldslabs.com>
  • Loading branch information
timfn-hg committed Jan 9, 2025
1 parent 7409480 commit 652d835
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
import com.swirlds.platform.system.SwirldMain;
import com.swirlds.platform.system.address.AddressBook;
import com.swirlds.platform.system.events.Event;
import com.swirlds.platform.system.state.notifications.AsyncIssListener;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import com.swirlds.platform.system.state.notifications.StateHashedListener;
import com.swirlds.platform.system.status.PlatformStatus;
import com.swirlds.platform.system.transaction.Transaction;
Expand Down Expand Up @@ -1004,7 +1004,7 @@ private void initializeDagger(@NonNull final State state, @NonNull final InitTri
notifications.unregister(PlatformStatusChangeListener.class, this);
notifications.unregister(ReconnectCompleteListener.class, daggerApp.reconnectListener());
notifications.unregister(StateWriteToDiskCompleteListener.class, daggerApp.stateWriteToDiskListener());
notifications.unregister(AsyncIssListener.class, daggerApp.asyncIssListener());
notifications.unregister(FatalIssListener.class, daggerApp.fatalIssListener());
if (blockStreamEnabled) {
notifications.unregister(StateHashedListener.class, daggerApp.blockStreamManager());
}
Expand Down Expand Up @@ -1058,7 +1058,7 @@ private void initializeDagger(@NonNull final State state, @NonNull final InitTri
notifications.register(PlatformStatusChangeListener.class, this);
notifications.register(ReconnectCompleteListener.class, daggerApp.reconnectListener());
notifications.register(StateWriteToDiskCompleteListener.class, daggerApp.stateWriteToDiskListener());
notifications.register(AsyncIssListener.class, daggerApp.asyncIssListener());
notifications.register(FatalIssListener.class, daggerApp.fatalIssListener());
if (blockStreamEnabled) {
notifications.register(StateHashedListener.class, daggerApp.blockStreamManager());
daggerApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import com.swirlds.platform.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.platform.system.InitTrigger;
import com.swirlds.platform.system.Platform;
import com.swirlds.platform.system.state.notifications.AsyncIssListener;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import com.swirlds.state.State;
import com.swirlds.state.lifecycle.StartupNetworks;
import com.swirlds.state.lifecycle.info.NetworkInfo;
Expand Down Expand Up @@ -147,7 +147,7 @@ public interface HederaInjectionComponent {

SubmissionManager submissionManager();

AsyncIssListener asyncIssListener();
FatalIssListener fatalIssListener();

@Component.Builder
interface Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package com.hedera.node.app.platform;

import com.hedera.node.app.annotations.CommonExecutor;
import com.hedera.node.app.state.listeners.IssDetectedListener;
import com.hedera.node.app.state.listeners.FatalIssListenerImpl;
import com.hedera.node.app.state.listeners.ReconnectListener;
import com.hedera.node.app.state.listeners.WriteStateToDiskListener;
import com.swirlds.common.stream.Signer;
import com.swirlds.platform.listeners.ReconnectCompleteListener;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.platform.system.Platform;
import com.swirlds.platform.system.state.notifications.AsyncIssListener;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
Expand Down Expand Up @@ -73,5 +73,5 @@ static IntSupplier provideFrontendThrottleSplit(@NonNull final Platform platform

@Binds
@Singleton
AsyncIssListener bindAsyncIssListener(IssDetectedListener listener);
FatalIssListener bindFatalIssListener(FatalIssListenerImpl listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@

package com.hedera.node.app.state.listeners;

import com.swirlds.platform.system.state.notifications.AsyncIssListener;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import com.swirlds.platform.system.state.notifications.IssNotification;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Singleton
public class IssDetectedListener implements AsyncIssListener {
public class FatalIssListenerImpl implements FatalIssListener {

private static final Logger log = LogManager.getLogger(IssDetectedListener.class);
private static final Logger log = LogManager.getLogger(FatalIssListenerImpl.class);

@Inject
public IssDetectedListener() {
public FatalIssListenerImpl() {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,7 @@ public IssHandler buildIssHandler() {
// halt without needing to be stopped here. This should eventually be cleaned up.
},
SystemExitUtils::handleFatalError,
blocks.issScratchpad(),
blocks.notificationEngine());
blocks.issScratchpad());
}
return issHandler;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,10 @@
import com.swirlds.platform.listeners.ReconnectCompleteNotification;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteNotification;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import com.swirlds.platform.system.state.notifications.IssListener;
import com.swirlds.platform.system.state.notifications.IssNotification;
import com.swirlds.platform.system.state.notifications.IssNotification.IssType;
import com.swirlds.platform.system.state.notifications.NewSignedStateListener;
import com.swirlds.platform.system.state.notifications.StateHashedListener;
import com.swirlds.platform.system.state.notifications.StateHashedNotification;
Expand Down Expand Up @@ -84,5 +86,10 @@ public void sendLatestCompleteStateNotification(
@Override
public void sendIssNotification(@NonNull final IssNotification notification) {
notificationEngine.dispatch(IssListener.class, notification);

if (IssType.CATASTROPHIC_ISS == notification.getIssType() || IssType.SELF_ISS == notification.getIssType()) {
// Forward notification to application
notificationEngine.dispatch(FatalIssListener.class, notification);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.merkle.utility.SerializableLong;
import com.swirlds.common.notification.NotificationEngine;
import com.swirlds.platform.components.common.output.FatalErrorConsumer;
import com.swirlds.platform.config.StateConfig;
import com.swirlds.platform.scratchpad.Scratchpad;
import com.swirlds.platform.state.iss.IssHandler;
import com.swirlds.platform.state.iss.IssScratchpad;
import com.swirlds.platform.system.SystemExitCode;
import com.swirlds.platform.system.state.notifications.AsyncIssListener;
import com.swirlds.platform.system.state.notifications.IssNotification;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
Expand All @@ -39,7 +37,6 @@ public class DefaultIssHandler implements IssHandler {
private final Consumer<String> haltRequestedConsumer;
private final FatalErrorConsumer fatalErrorConsumer;
private final Scratchpad<IssScratchpad> issScratchpad;
private final NotificationEngine notificationEngine;

private boolean halted;

Expand All @@ -55,13 +52,11 @@ public DefaultIssHandler(
@NonNull final PlatformContext platformContext,
@NonNull final Consumer<String> haltRequestedConsumer,
@NonNull final FatalErrorConsumer fatalErrorConsumer,
@NonNull final Scratchpad<IssScratchpad> issScratchpad,
@NonNull final NotificationEngine notificationEngine) {
@NonNull final Scratchpad<IssScratchpad> issScratchpad) {
this.haltRequestedConsumer = Objects.requireNonNull(haltRequestedConsumer);
this.fatalErrorConsumer = Objects.requireNonNull(fatalErrorConsumer);
this.stateConfig = platformContext.getConfiguration().getConfigData(StateConfig.class);
this.issScratchpad = Objects.requireNonNull(issScratchpad);
this.notificationEngine = Objects.requireNonNull(notificationEngine);
}

/**
Expand Down Expand Up @@ -123,9 +118,6 @@ private void selfIssObserved(@NonNull final IssNotification notification) {
return;
}

// For self-ISS events, forward the notification asynchronously to the app
notificationEngine.dispatch(AsyncIssListener.class, notification);

updateIssRoundInScratchpad(notification.getRound());

if (stateConfig.haltOnAnyIss()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import com.swirlds.common.notification.Listener;

/**
* Async listener for ISS events. If you require ordered, synchronous dispatch use {@link IssListener}.
* Listener for fatal ISS events (i.e. of type SELF or CATASTROPHIC). This listener is unordered and asynchronous.
* If you require ordered and synchronous dispatch that includes all ISS events, then use {@link IssListener}.
*/
@DispatchModel(mode = DispatchMode.ASYNC, order = DispatchOrder.UNORDERED)
public interface AsyncIssListener extends Listener<IssNotification> {}
public interface FatalIssListener extends Listener<IssNotification> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (C) 2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.components;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.notification.NotificationEngine;
import com.swirlds.common.notification.NotificationResult;
import com.swirlds.common.threading.futures.StandardFuture.CompletionCallback;
import com.swirlds.platform.components.appcomm.CompleteStateNotificationWithCleanup;
import com.swirlds.platform.listeners.PlatformStatusChangeListener;
import com.swirlds.platform.listeners.PlatformStatusChangeNotification;
import com.swirlds.platform.listeners.ReconnectCompleteListener;
import com.swirlds.platform.listeners.ReconnectCompleteNotification;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteListener;
import com.swirlds.platform.listeners.StateWriteToDiskCompleteNotification;
import com.swirlds.platform.system.SwirldState;
import com.swirlds.platform.system.state.notifications.FatalIssListener;
import com.swirlds.platform.system.state.notifications.IssListener;
import com.swirlds.platform.system.state.notifications.IssNotification;
import com.swirlds.platform.system.state.notifications.IssNotification.IssType;
import com.swirlds.platform.system.state.notifications.NewSignedStateListener;
import com.swirlds.platform.system.state.notifications.NewSignedStateNotification;
import com.swirlds.platform.system.state.notifications.StateHashedListener;
import com.swirlds.platform.system.state.notifications.StateHashedNotification;
import com.swirlds.platform.system.status.PlatformStatus;
import java.time.Instant;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;

public class DefaultAppNotifierTest {

NotificationEngine notificationEngine;
AppNotifier notifier;

@BeforeEach
void beforeEach() {
notificationEngine = mock(NotificationEngine.class);
notifier = new DefaultAppNotifier(notificationEngine);
}

@Test
void testStateWrittenToDiskNotificationSent() {
final StateWriteToDiskCompleteNotification notification =
new StateWriteToDiskCompleteNotification(100, Instant.now(), false);

assertDoesNotThrow(() -> notifier.sendStateWrittenToDiskNotification(notification));
verify(notificationEngine, times(1)).dispatch(StateWriteToDiskCompleteListener.class, notification);
verifyNoMoreInteractions(notificationEngine);
}

@Test
void testStateHashNotificationSent() {
final StateHashedNotification notification = new StateHashedNotification(100L, new Hash(DigestType.SHA_384));

assertDoesNotThrow(() -> notifier.sendStateHashedNotification(notification));
verify(notificationEngine, times(1)).dispatch(StateHashedListener.class, notification);
verifyNoMoreInteractions(notificationEngine);
}

@Test
void testReconnectCompleteNotificationSent() {
final SwirldState state = mock(SwirldState.class);
final ReconnectCompleteNotification notification =
new ReconnectCompleteNotification(100L, Instant.now(), state);

assertDoesNotThrow(() -> notifier.sendReconnectCompleteNotification(notification));
verify(notificationEngine, times(1)).dispatch(ReconnectCompleteListener.class, notification);
verifyNoMoreInteractions(notificationEngine);
}

@Test
void testPlatformStatusChangeNotificationSent() {
final PlatformStatus status = PlatformStatus.ACTIVE;
final ArgumentCaptor<PlatformStatusChangeNotification> captor =
ArgumentCaptor.forClass(PlatformStatusChangeNotification.class);

assertDoesNotThrow(() -> notifier.sendPlatformStatusChangeNotification(status));
verify(notificationEngine, times(1)).dispatch(eq(PlatformStatusChangeListener.class), captor.capture());
verifyNoMoreInteractions(notificationEngine);

final PlatformStatusChangeNotification notification = captor.getValue();
assertNotNull(notification);
assertEquals(status, notification.getNewStatus());
}

@Test
void testLatestCompleteStateNotificationSent() {
final SwirldState state = mock(SwirldState.class);
final CompletionCallback<NotificationResult<NewSignedStateNotification>> cleanup =
mock(CompletionCallback.class);
final NewSignedStateNotification signedStateNotification =
new NewSignedStateNotification(state, 100L, Instant.now());
final CompleteStateNotificationWithCleanup notificationWithCleanup =
new CompleteStateNotificationWithCleanup(signedStateNotification, cleanup);

assertDoesNotThrow(() -> notifier.sendLatestCompleteStateNotification(notificationWithCleanup));
verify(notificationEngine, times(1)).dispatch(NewSignedStateListener.class, signedStateNotification, cleanup);
verifyNoMoreInteractions(notificationEngine);
}

public static List<Arguments> issTypes() {
return List.of(
Arguments.of(IssType.CATASTROPHIC_ISS, true),
Arguments.of(IssType.SELF_ISS, true),
Arguments.of(IssType.OTHER_ISS, false));
}

@ParameterizedTest
@MethodSource("issTypes")
void testIssNotificationSent(final IssType type, final boolean isFatal) {
final IssNotification notification = new IssNotification(100L, type);

assertDoesNotThrow(() -> notifier.sendIssNotification(notification));

// verify the ISS notification is always sent to the IssListener
verify(notificationEngine, times(1)).dispatch(IssListener.class, notification);

if (isFatal) {
// if the ISS event is considered fatal to the local node, verify the event is also sent to the
// FatalIssListener
verify(notificationEngine, times(1)).dispatch(FatalIssListener.class, notification);
}

verifyNoMoreInteractions(notificationEngine);
}
}
Loading

0 comments on commit 652d835

Please sign in to comment.