Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2138] Download receipts during fast sync and import without processing transactions #701

Merged
merged 8 commits into from
Jan 30, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public boolean validateHeader(
switch (mode) {
case NONE:
return true;
case LIGHT_DETACHED_ONLY:
return applyRules(
header,
parent,
protocolContext,
rule -> rule.includeInLightValidation() && rule.isDetachedSupported());
case LIGHT_SKIP_DETACHED:
return applyRules(
header,
parent,
protocolContext,
rule -> rule.includeInLightValidation() && !rule.isDetachedSupported());
case LIGHT:
return applyRules(header, parent, protocolContext, Rule::includeInLightValidation);
case DETACHED_ONLY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ public enum HeaderValidationMode {
/** No Validation. data must be pre-validated */
NONE,

/** Skip proof of work validation */
LIGHT_DETACHED_ONLY,

/** Skip proof of work validation */
LIGHT_SKIP_DETACHED,

/** Skip proof of work validation */
LIGHT,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.mainnet;

import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.ethereum.BlockValidator;
import tech.pegasys.pantheon.ethereum.BlockValidator.BlockProcessingOutputs;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
Expand All @@ -25,12 +23,8 @@
import java.util.List;
import java.util.Optional;

import org.apache.logging.log4j.Logger;

public class MainnetBlockImporter<C> implements BlockImporter<C> {

private static final Logger LOG = getLogger();

final BlockValidator<C> blockValidator;

public MainnetBlockImporter(final BlockValidator<C> blockValidator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ChainDownloader<C> {
private final CheckpointHeaderManager<C> checkpointHeaderManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final CompletableFuture<Void> downloadFuture = new CompletableFuture<>();

private int chainSegmentTimeouts = 0;

Expand All @@ -73,9 +74,10 @@ public ChainDownloader(
this.blockImportTaskFactory = blockImportTaskFactory;
}

public void start() {
public CompletableFuture<Void> start() {
if (started.compareAndSet(false, true)) {
executeDownload();
return downloadFuture;
} else {
throw new IllegalStateException(
"Attempt to start an already started " + this.getClass().getSimpleName() + ".");
Expand Down Expand Up @@ -110,8 +112,11 @@ private CompletableFuture<?> executeDownload() {
ethContext
.getScheduler()
.scheduleFutureTask(this::executeDownload, Duration.ofSeconds(2));
} else {
} else if (syncTargetManager.shouldContinueDownloading()) {
executeDownload();
} else {
LOG.info("Chain download complete");
downloadFuture.complete(null);
}
});
return currentTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@ public void clearSyncTarget(final SyncTarget syncTarget) {
}

public abstract boolean shouldSwitchSyncTarget(final SyncTarget currentTarget);

public abstract boolean shouldContinueDownloading();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;

import java.util.List;

class BlockWithReceipts {
private final Block block;
private final List<TransactionReceipt> receipts;

BlockWithReceipts(final Block block, final List<TransactionReceipt> receipts) {
this.block = block;
this.receipts = receipts;
}

public BlockHeader getHeader() {
return block.getHeader();
}

public Block getBlock() {
return block;
}

public List<TransactionReceipt> getReceipts() {
return receipts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public CompletableFuture<Void> downloadChain(final FastSyncState currentState) {
syncState,
ethTasksTimer,
currentState.getPivotBlockHeader().get());
downloader.start();
return new CompletableFuture<>();
return downloader.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FastSyncBlockHandler<C> implements BlockHandler<BlockWithReceipts> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;

public FastSyncBlockHandler(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
}

@Override
public CompletableFuture<List<BlockWithReceipts>> downloadBlocks(
final List<BlockHeader> headers) {
return downloadBodies(headers)
.thenCombine(downloadReceipts(headers), this::combineBlocksAndReceipts);
}

private CompletableFuture<List<Block>> downloadBodies(final List<BlockHeader> headers) {
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer)
.run();
}

private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> downloadReceipts(
final List<BlockHeader> headers) {
return GetReceiptsFromPeerTask.forHeaders(ethContext, headers, ethTasksTimer)
.run()
.thenApply(PeerTaskResult::getResult);
}

private List<BlockWithReceipts> combineBlocksAndReceipts(
final List<Block> blocks, final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader) {
return blocks
.stream()
.map(
block -> {
final List<TransactionReceipt> receipts =
receiptsByHeader.getOrDefault(block.getHeader(), emptyList());
return new BlockWithReceipts(block, receipts);
})
.collect(Collectors.toList());
}

@Override
public CompletableFuture<List<BlockWithReceipts>> validateAndImportBlocks(
final List<BlockWithReceipts> blocksWithReceipts) {
LOG.debug(
"Storing blocks {} to {}",
blocksWithReceipts.get(0).getHeader().getNumber(),
blocksWithReceipts.get(blocksWithReceipts.size() - 1).getHeader().getNumber());
blocksWithReceipts.forEach(
block -> {
final BlockImporter<C> blockImporter =
protocolSchedule.getByBlockNumber(block.getHeader().getNumber()).getBlockImporter();
// TODO: This is still doing full ommer validation. Is that required?
blockImporter.fastImportBlock(
protocolContext,
block.getBlock(),
block.getReceipts(),
HeaderValidationMode.LIGHT_SKIP_DETACHED);
});
return CompletableFuture.completedFuture(blocksWithReceipts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class FastSyncChainDownloader<C> {
private final ChainDownloader<C> chainDownloader;
Expand Down Expand Up @@ -77,8 +79,8 @@ public class FastSyncChainDownloader<C> {
this::importBlocksForCheckpoints);
}

public void start() {
chainDownloader.start();
public CompletableFuture<Void> start() {
return chainDownloader.start();
}

private CompletableFuture<List<Block>> importBlocksForCheckpoints(
Expand All @@ -91,14 +93,21 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
return CompletableFuture.completedFuture(emptyList());
}
}
final PipelinedImportChainSegmentTask<C> importTask =
final PipelinedImportChainSegmentTask<C, BlockWithReceipts> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
new FastSyncBlockHandler<>(
protocolSchedule, protocolContext, ethContext, ethTasksTimer),
HeaderValidationMode.LIGHT_DETACHED_ONLY,
checkpointHeaders);
return importTask.run();
return importTask
.run()
.thenApply(
results ->
results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -39,6 +40,7 @@ class FastSyncTargetManager<C> extends SyncTargetManager<C> {
private static final Logger LOG = LogManager.getLogger();

private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockHeader pivotBlockHeader;
Expand All @@ -53,6 +55,7 @@ public FastSyncTargetManager(
final BlockHeader pivotBlockHeader) {
super(config, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.pivotBlockHeader = pivotBlockHeader;
Expand Down Expand Up @@ -89,7 +92,7 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
.timeout(task)
.thenApply(
result -> {
if (result.size() != 1 || !result.get(0).equals(pivotBlockHeader)) {
if (peerHasDifferentPivotBlock(result)) {
bestPeer.disconnect(DisconnectReason.USELESS_PEER);
return Optional.<EthPeer>empty();
} else {
Expand All @@ -103,8 +106,17 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
});
}

private boolean peerHasDifferentPivotBlock(final List<BlockHeader> result) {
return result.size() != 1 || !result.get(0).equals(pivotBlockHeader);
}

@Override
public boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) {
return false;
}

@Override
public boolean shouldContinueDownloading() {
return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash());
}
}
Loading