Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-1273] Start of fast sync downloader (#613)
Browse files Browse the repository at this point in the history
* Add support for initiating fast sync to DefaultSynchronizer, starting full sync once that completes.

* Wait for a minimum number of peers to be available before starting fast sync.

* Select pivot block.

* Fetch the pivot block header.

* Ensure that a majority of peers (which have the pivot block) agree on the block.

* Pull isRetryingError and assignPeer up to AbstractRetryingPeerTask so it can be reused.
  • Loading branch information
ajsutton authored Jan 23, 2019
1 parent c8f1748 commit aa58d67
Show file tree
Hide file tree
Showing 25 changed files with 1,145 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@

import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
* #executePeerTask()} is complete with a non-empty list the retry counter is reset.
* #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset.
*
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
Expand All @@ -40,6 +44,7 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
private final int maxRetries;
private int retryCount = 0;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private Optional<EthPeer> assignedPeer = Optional.empty();

/**
* @param ethContext The context of the current Eth network we are attached to.
Expand All @@ -56,6 +61,10 @@ public AbstractRetryingPeerTask(
this.maxRetries = maxRetries;
}

public void assignPeer(final EthPeer peer) {
assignedPeer = Optional.of(peer);
}

@Override
protected void executeTask() {
if (result.get().isDone()) {
Expand All @@ -68,7 +77,7 @@ protected void executeTask() {
}

retryCount += 1;
executePeerTask()
executePeerTask(assignedPeer)
.whenComplete(
(peerResult, error) -> {
if (error != null) {
Expand All @@ -83,7 +92,7 @@ protected void executeTask() {
});
}

protected abstract CompletableFuture<T> executePeerTask();
protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer);

private void handleTaskError(final Throwable error) {
final Throwable cause = ExceptionUtils.rootCause(error);
Expand Down Expand Up @@ -118,5 +127,12 @@ private void handleTaskError(final Throwable error) {
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}

protected abstract boolean isRetryableError(Throwable error);
private boolean isRetryableError(final Throwable error) {
final boolean isPeerError =
error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,7 +40,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final SyncState syncState;
private final AtomicBoolean started = new AtomicBoolean(false);
private final BlockPropagationManager<C> blockPropagationManager;
private final Downloader<C> downloader;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -54,28 +60,60 @@ public DefaultSynchronizer(
syncState,
new PendingBlocks(),
ethTasksTimer);
this.downloader =
new Downloader<>(
this.fullSyncDownloader =
new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode().equals(SyncMode.FAST)) {
if (syncConfig.syncMode() == SyncMode.FAST) {
LOG.info("Fast sync enabled.");
this.fastSyncDownloader =
Optional.of(
new FastSyncDownloader<>(
new FastSyncActions<>(
syncConfig, protocolSchedule, protocolContext, ethContext, ethTasksTimer)));
} else {
this.fastSyncDownloader = Optional.empty();
}
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
downloader.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
} else {
startFullSync();
}
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
"Fast sync failed ({}), switching to full sync.",
((FastSyncException) rootCause).getError());
} else if (error != null) {
LOG.error("Fast sync failed, switching to full sync.", error);
} else {
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
startFullSync();
}

private void startFullSync() {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
fullSyncDownloader.start();
}

@Override
public Optional<SyncStatus> getSyncStatus() {
if (!started.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Downloader<C> {
public class FullSyncDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
Expand All @@ -73,7 +73,7 @@ public class Downloader<C> {
private long syncTargetDisconnectListenerId;
protected CompletableFuture<?> currentTask;

Downloader(
FullSyncDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Duration;
import java.util.Optional;

import com.google.common.collect.Range;
Expand All @@ -30,10 +31,14 @@ public class SynchronizerConfiguration {
// TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofSeconds(3);

// Fast sync config
private final int fastSyncPivotDistance;
private final float fastSyncFullValidationRate;
private final int fastSyncMinimumPeerCount;
private final Duration fastSyncMaximumPeerWaitTime;

// Block propagation config
private final Range<Long> blockPropagationRange;
Expand All @@ -58,6 +63,8 @@ private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
final int fastSyncPivotDistance,
final float fastSyncFullValidationRate,
final int fastSyncMinimumPeerCount,
final Duration fastSyncMaximumPeerWaitTime,
final Range<Long> blockPropagationRange,
final Optional<SyncMode> syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -73,6 +80,8 @@ private SynchronizerConfiguration(
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -115,6 +124,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
requestedSyncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime,
blockPropagationRange,
Optional.of(actualSyncMode),
downloaderChangeTargetThresholdByHeight,
Expand Down Expand Up @@ -222,6 +233,14 @@ public float fastSyncFullValidationRate() {
return fastSyncFullValidationRate;
}

public int getFastSyncMinimumPeerCount() {
return fastSyncMinimumPeerCount;
}

public Duration getFastSyncMaximumPeerWaitTime() {
return fastSyncMaximumPeerWaitTime;
}

public static class Builder {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
Expand Down Expand Up @@ -318,6 +337,8 @@ public SynchronizerConfiguration build() {
syncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
DEFAULT_FAST_SYNC_MINIMUM_PEERS,
DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME,
blockPropagationRange,
Optional.empty(),
downloaderChangeTargetThresholdByHeight,
Expand Down
Loading

0 comments on commit aa58d67

Please sign in to comment.