Skip to content

Commit

Permalink
[PAN-3183] Less verbose synching subscriptions (hyperledger#59)
Browse files Browse the repository at this point in the history
We should not send a sync status for every forking block state update.
Yes, we send status updates for detected forks as well as new canonical
heads.

Instead we should send a synching message for status changes as well as
when we reorg the chain.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
Signed-off-by: edwardmack <ed@edwardmack.com>
  • Loading branch information
shemnon authored and edwardmack committed Nov 4, 2019
1 parent 6ad582e commit dfce021
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ public class NotSynchronisingResult implements JsonRpcResult {
public boolean getResult() {
return false;
}

@Override
public boolean equals(final Object o) {
return (this == o) || (o != null && getClass() == o.getClass());
}

@Override
public int hashCode() {
return "NotSyncingResult".hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.plugin.data.SyncStatus;

import java.util.Optional;

public class SyncingSubscriptionService {

private final SubscriptionManager subscriptionManager;
private Optional<Boolean> lastMessageWasInSync = Optional.empty();

public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
Expand All @@ -37,15 +40,21 @@ private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
Subscription.class,
syncingSubscriptions -> {
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new NotSynchronisingResult()));
if (!lastMessageWasInSync.orElse(Boolean.FALSE)) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new NotSynchronisingResult()));
lastMessageWasInSync = Optional.of(Boolean.TRUE);
}
} else {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new SyncingResult(syncStatus)));
if (lastMessageWasInSync.orElse(Boolean.TRUE)) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new SyncingResult(syncStatus)));
lastMessageWasInSync = Optional.of(Boolean.FALSE);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.SyncingResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
Expand Down Expand Up @@ -102,4 +106,89 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
ArgumentMatchers.eq(subscription.getSubscriptionId()),
any(NotSynchronisingResult.class));
}

@Test
public void shouldNotRepeatOutOfSyncMessages() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager, atMostOnce())
.sendMessage(
ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
}

@Test
public void shouldNotRepeatInSyncMessages() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 3L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager, atMostOnce())
.sendMessage(
ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
}

@Test
public void shouldOnlyReportSyncChange() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);

final SyncStatus inSyncStatus = new SyncStatus(0L, 3L, 3L);
final SyncStatus outOfSyncStatus = new SyncStatus(0L, 1L, 3L);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);

final var resultCaptor = ArgumentCaptor.forClass(JsonRpcResult.class);
final NotSynchronisingResult inSyncResult = new NotSynchronisingResult();
final SyncingResult outOfSyncingResult = new SyncingResult(outOfSyncStatus);

verify(subscriptionManager, times(4)).sendMessage(any(), resultCaptor.capture());
assertThat(resultCaptor.getAllValues())
.containsOnly(outOfSyncingResult, inSyncResult, outOfSyncingResult, inSyncResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ public final class SyncStatus implements org.hyperledger.besu.plugin.data.SyncSt
private final long startingBlock;
private final long currentBlock;
private final long highestBlock;
private final boolean inSync;

public SyncStatus(final long startingBlock, final long currentBlock, final long highestBlock) {
this(startingBlock, currentBlock, highestBlock, currentBlock == highestBlock);
}

public SyncStatus(
final long startingBlock,
final long currentBlock,
final long highestBlock,
final boolean inSync) {
this.startingBlock = startingBlock;
this.currentBlock = currentBlock;
this.highestBlock = highestBlock;
this.inSync = inSync;
}

@Override
Expand All @@ -45,7 +55,7 @@ public long getHighestBlock() {

@Override
public boolean inSync() {
return currentBlock == highestBlock;
return inSync;
}

@Override
Expand All @@ -56,14 +66,15 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SyncStatus that = (SyncStatus) o;
final SyncStatus that = (SyncStatus) o;
return startingBlock == that.startingBlock
&& currentBlock == that.currentBlock
&& highestBlock == that.highestBlock;
&& highestBlock == that.highestBlock
&& inSync == that.inSync;
}

@Override
public int hashCode() {
return Objects.hash(startingBlock, currentBlock, highestBlock);
return Objects.hash(startingBlock, currentBlock, highestBlock, inSync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SyncState {
private final Blockchain blockchain;
private final EthPeers ethPeers;

private final long startingBlock;
private long startingBlock;
private boolean lastInSync = true;
private final Subscribers<InSyncListener> inSyncListeners = Subscribers.create();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
Expand All @@ -49,7 +49,17 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
if (event.isNewCanonicalHead()) {
checkInSync();
}
publishSyncStatus();
switch (event.getEventType()) {
case CHAIN_REORG:
publishReorg();
// fall through
case HEAD_ADVANCED:
publishSyncStatus();
break;
case FORK:
// don't broadcast detected forks
break;
}
});
}

Expand All @@ -59,6 +69,15 @@ public void publishSyncStatus() {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

private void publishReorg() {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
final SyncStatus syncStatus =
new SyncStatus(
startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber), false);

syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
}
Expand All @@ -74,11 +93,7 @@ public void removeSyncStatusListener(final long listenerId) {
public SyncStatus syncStatus() {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
return new SyncStatus(
startingBlock(), chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}

public long startingBlock() {
return startingBlock;
startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}

public Optional<SyncTarget> syncTarget() {
Expand Down Expand Up @@ -153,6 +168,10 @@ private synchronized void checkInSync() {
final boolean currentInSync = isInSync();
if (lastInSync != currentInSync) {
lastInSync = currentInSync;
if (!currentInSync) {
// when we fall out of sync change our starting block
startingBlock = blockchain.getChainHeadBlockNumber();
}
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -33,6 +34,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.SyncStatus;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
Expand All @@ -57,6 +59,7 @@ public class SyncStateTest {
private final Blockchain blockchain = mock(Blockchain.class);
private final EthPeers ethPeers = mock(EthPeers.class);
private final SyncState.InSyncListener inSyncListener = mock(SyncState.InSyncListener.class);
private final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
private final EthPeer syncTargetPeer = mock(EthPeer.class);
private final ChainState syncTargetPeerChainState = spy(new ChainState());
private final EthPeer otherPeer = mock(EthPeer.class);
Expand All @@ -80,6 +83,7 @@ public void setUp() {
syncState = new SyncState(blockchain, ethPeers);
blockAddedObserver = captor.getValue();
syncState.addInSyncListener(inSyncListener);
syncState.addSyncStatusListener(syncStatusListener);
}

@Test
Expand Down Expand Up @@ -229,7 +233,7 @@ public void shouldBecomeInSyncWhenOurBlockchainCatchesUp() {

@Test
public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
syncState.addSyncStatusListener(syncStatusListener);

blockAddedObserver.onBlockAdded(
Expand All @@ -242,6 +246,26 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus()));
}

@Test
public void shouldReportReorgEvents() {
when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT);

blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForChainReorg(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList())),
Collections.emptyList(),
Collections.emptyList()),
blockchain);

assertThat(syncState.isInSync()).isTrue();
final ArgumentCaptor<SyncStatus> captor = ArgumentCaptor.forClass(SyncStatus.class);
verify(syncStatusListener, times(2)).onSyncStatusChanged(captor.capture());
assertThat(captor.getAllValues().get(0).inSync()).isFalse();
assertThat(captor.getAllValues().get(1).inSync()).isTrue();
}

private void setupOutOfSyncState() {
updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
Expand Down

0 comments on commit dfce021

Please sign in to comment.