Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-1273] Start of fast sync downloader #613

Merged
merged 27 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f2d5f68
Add support for initiating fast sync to DefaultSynchronizer, starting…
ajsutton Jan 19, 2019
cb8cfb2
Wait for a minimum number of peers to be available before starting fa…
ajsutton Jan 20, 2019
8b10aed
Add tests for fast sync waiting for peers and the overall coordinatio…
ajsutton Jan 21, 2019
6f3abb0
Select pivot block.
ajsutton Jan 21, 2019
27b044c
Fetch the pivot block header.
ajsutton Jan 21, 2019
d813ffc
Switch to throwing an exception to abort the fast sync pipeline inste…
ajsutton Jan 21, 2019
6ecea9c
waitForSuitablePeers doesn't need to return a FastSyncState.
ajsutton Jan 21, 2019
89b134d
Add a basic test for downloadPivotBlockHeader.
ajsutton Jan 21, 2019
bf5188e
Create a task specifically for getting the pivot block header so that…
ajsutton Jan 21, 2019
f67c9f9
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
815d385
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
c1e1c2f
Add basic tests for GetPivotBlockHeader.
ajsutton Jan 21, 2019
b7266d1
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
32bdd13
Add check to ensure that a majority of peers (which have the pivot bl…
ajsutton Jan 22, 2019
ae0a318
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
7f42f9a
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
c43524a
Merge branch 'master' into NC-2136
ajsutton Jan 22, 2019
5329db7
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
1ac4f4f
Throw exceptions all the way back out the top instead of mapping to a…
ajsutton Jan 22, 2019
3598197
Move PivotBlockRetriever to the fastsync package.
ajsutton Jan 22, 2019
6a7fac0
Call wait for peers directly instead of sending to the worker pool.
ajsutton Jan 22, 2019
1bdc38d
Simplify check for any available peers.
ajsutton Jan 22, 2019
1ba9c7f
Pull isRetryingError and assignPeer up to AbstractRetryingPeerTask so…
ajsutton Jan 23, 2019
599af66
Merge branch 'NC-2136' of github.com:ajsutton/pantheon into NC-2136
ajsutton Jan 23, 2019
d0b6fac
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 23, 2019
0f2ec34
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 23, 2019
e733789
Ensure we repeatedly print messages to indicate we're waiting for a p…
ajsutton Jan 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@

import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
* #executePeerTask()} is complete with a non-empty list the retry counter is reset.
* #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset.
*
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
Expand All @@ -40,6 +44,7 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
private final int maxRetries;
private int retryCount = 0;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private Optional<EthPeer> assignedPeer = Optional.empty();

/**
* @param ethContext The context of the current Eth network we are attached to.
Expand All @@ -56,6 +61,10 @@ public AbstractRetryingPeerTask(
this.maxRetries = maxRetries;
}

public void assignPeer(final EthPeer peer) {
assignedPeer = Optional.of(peer);
}

@Override
protected void executeTask() {
if (result.get().isDone()) {
Expand All @@ -68,7 +77,7 @@ protected void executeTask() {
}

retryCount += 1;
executePeerTask()
executePeerTask(assignedPeer)
.whenComplete(
(peerResult, error) -> {
if (error != null) {
Expand All @@ -83,7 +92,7 @@ protected void executeTask() {
});
}

protected abstract CompletableFuture<T> executePeerTask();
protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer);

private void handleTaskError(final Throwable error) {
final Throwable cause = ExceptionUtils.rootCause(error);
Expand Down Expand Up @@ -118,5 +127,12 @@ private void handleTaskError(final Throwable error) {
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}

protected abstract boolean isRetryableError(Throwable error);
private boolean isRetryableError(final Throwable error) {
final boolean isPeerError =
error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,7 +40,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final SyncState syncState;
private final AtomicBoolean started = new AtomicBoolean(false);
private final BlockPropagationManager<C> blockPropagationManager;
private final Downloader<C> downloader;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -54,28 +60,60 @@ public DefaultSynchronizer(
syncState,
new PendingBlocks(),
ethTasksTimer);
this.downloader =
new Downloader<>(
this.fullSyncDownloader =
new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode().equals(SyncMode.FAST)) {
if (syncConfig.syncMode() == SyncMode.FAST) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - why prefer == here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely ease of reading and because I was confused by the .equals for a bit thinking it wasn't actually an enum.

LOG.info("Fast sync enabled.");
this.fastSyncDownloader =
Optional.of(
new FastSyncDownloader<>(
new FastSyncActions<>(
syncConfig, protocolSchedule, protocolContext, ethContext, ethTasksTimer)));
} else {
this.fastSyncDownloader = Optional.empty();
}
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
downloader.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
} else {
startFullSync();
}
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
"Fast sync failed ({}), switching to full sync.",
((FastSyncException) rootCause).getError());
} else if (error != null) {
LOG.error("Fast sync failed, switching to full sync.", error);
} else {
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
startFullSync();
}

private void startFullSync() {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
fullSyncDownloader.start();
}

@Override
public Optional<SyncStatus> getSyncStatus() {
if (!started.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Downloader<C> {
public class FullSyncDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
Expand All @@ -73,7 +73,7 @@ public class Downloader<C> {
private long syncTargetDisconnectListenerId;
protected CompletableFuture<?> currentTask;

Downloader(
FullSyncDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Duration;
import java.util.Optional;

import com.google.common.collect.Range;
Expand All @@ -30,10 +31,14 @@ public class SynchronizerConfiguration {
// TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofSeconds(3);

// Fast sync config
private final int fastSyncPivotDistance;
private final float fastSyncFullValidationRate;
private final int fastSyncMinimumPeerCount;
private final Duration fastSyncMaximumPeerWaitTime;

// Block propagation config
private final Range<Long> blockPropagationRange;
Expand All @@ -58,6 +63,8 @@ private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
final int fastSyncPivotDistance,
final float fastSyncFullValidationRate,
final int fastSyncMinimumPeerCount,
final Duration fastSyncMaximumPeerWaitTime,
final Range<Long> blockPropagationRange,
final Optional<SyncMode> syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -73,6 +80,8 @@ private SynchronizerConfiguration(
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -115,6 +124,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
requestedSyncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime,
blockPropagationRange,
Optional.of(actualSyncMode),
downloaderChangeTargetThresholdByHeight,
Expand Down Expand Up @@ -222,6 +233,14 @@ public float fastSyncFullValidationRate() {
return fastSyncFullValidationRate;
}

public int getFastSyncMinimumPeerCount() {
return fastSyncMinimumPeerCount;
}

public Duration getFastSyncMaximumPeerWaitTime() {
return fastSyncMaximumPeerWaitTime;
}

public static class Builder {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
Expand Down Expand Up @@ -318,6 +337,8 @@ public SynchronizerConfiguration build() {
syncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
DEFAULT_FAST_SYNC_MINIMUM_PEERS,
DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME,
blockPropagationRange,
Optional.empty(),
downloaderChangeTargetThresholdByHeight,
Expand Down
Loading