diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java new file mode 100644 index 00000000000..a59183b14ac --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java @@ -0,0 +1,163 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse; +import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; +import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements PeerTask for getting block bodies from peers, and matches headers to bodies to supply + * full blocks + */ +public class GetBodiesFromPeerTask implements PeerTask> { + + private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class); + + private static final int DEFAULT_RETRIES_AGAINST_OTHER_PEERS = 5; + + private final List blockHeaders; + private final ProtocolSchedule protocolSchedule; + private final int allowedRetriesAgainstOtherPeers; + + private final long requiredBlockchainHeight; + private final List blocks = new ArrayList<>(); + private final boolean isPoS; + + public GetBodiesFromPeerTask( + final List blockHeaders, final ProtocolSchedule protocolSchedule) { + this(blockHeaders, protocolSchedule, DEFAULT_RETRIES_AGAINST_OTHER_PEERS); + } + + public GetBodiesFromPeerTask( + final List blockHeaders, + final ProtocolSchedule protocolSchedule, + final int allowedRetriesAgainstOtherPeers) { + if (blockHeaders == null || blockHeaders.isEmpty()) { + throw new IllegalArgumentException("Block headers must not be empty"); + } + + this.blockHeaders = blockHeaders; + this.protocolSchedule = protocolSchedule; + this.allowedRetriesAgainstOtherPeers = allowedRetriesAgainstOtherPeers; + + this.requiredBlockchainHeight = + blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); + this.isPoS = protocolSchedule.getByBlockHeader(blockHeaders.getLast()).isPoS(); + } + + @Override + public SubProtocol getSubProtocol() { + return EthProtocol.get(); + } + + @Override + public MessageData getRequestMessage() { + return GetBlockBodiesMessage.create( + blockHeaders.stream().map(BlockHeader::getBlockHash).toList()); + } + + @Override + public List processResponse(final MessageData messageData) + throws InvalidPeerTaskResponseException { + // Blocks returned by this method are in the same order as the headers, but might not be + // complete + if (messageData == null) { + throw new InvalidPeerTaskResponseException(); + } + final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData); + final List blockBodies = blocksMessage.bodies(protocolSchedule); + if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) { + throw new InvalidPeerTaskResponseException(); + } + + for (int i = 0; i < blockBodies.size(); i++) { + final BlockBody blockBody = blockBodies.get(i); + final BlockHeader blockHeader = blockHeaders.get(i); + if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) { + LOG.atDebug().setMessage("Received block body does not match block header").log(); + throw new InvalidPeerTaskResponseException(); + } + + blocks.add(new Block(blockHeader, blockBody)); + } + return blocks; + } + + @Override + public int getRetriesWithOtherPeer() { + return allowedRetriesAgainstOtherPeers; + } + + private boolean blockBodyMatchesBlockHeader( + final BlockBody blockBody, final BlockHeader blockHeader) { + // this method validates that the block body matches the block header by calculating the roots + // of the block body and comparing them to the roots in the block header + if (!BodyValidation.transactionsRoot(blockBody.getTransactions()) + .equals(blockHeader.getTransactionsRoot())) { + return false; + } + if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) { + return false; + } + if (!blockBody + .getWithdrawals() + .map(BodyValidation::withdrawalsRoot) + .equals(blockHeader.getWithdrawalsRoot())) { + return false; + } + + return true; + } + + @Override + public Predicate getPeerRequirementFilter() { + return (ethPeer) -> + isPoS || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight; + } + + @Override + public PeerTaskValidationResponse validateResult(final List result) { + if (result.isEmpty()) { + return PeerTaskValidationResponse.NO_RESULTS_RETURNED; + } + return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD; + } + + public List getBlockHeaders() { + return blockHeaders; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 43dfa275eba..c872f195103 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -137,6 +137,7 @@ public DefaultSynchronizer( syncState, metricsSystem, terminationCondition, + peerTaskExecutor, syncDurationMetrics)); if (SyncMode.FAST.equals(syncConfig.getSyncMode())) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java index 26fc7b544ce..d65d8e82407 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask; +import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -31,19 +32,38 @@ public class DownloadBodiesStep private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; private final MetricsSystem metricsSystem; + private final SynchronizerConfiguration synchronizerConfiguration; public DownloadBodiesStep( final ProtocolSchedule protocolSchedule, final EthContext ethContext, + final SynchronizerConfiguration synchronizerConfiguration, final MetricsSystem metricsSystem) { this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.synchronizerConfiguration = synchronizerConfiguration; this.metricsSystem = metricsSystem; } @Override public CompletableFuture> apply(final List blockHeaders) { - return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem) - .run(); + if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { + return ethContext + .getScheduler() + .scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders)); + } else { + return CompleteBlocksTask.forHeaders( + protocolSchedule, ethContext, blockHeaders, metricsSystem) + .run(); + } + } + + private CompletableFuture> getBodiesWithPeerTaskSystem( + final List headers) { + + final CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor()); + final List blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers(); + return CompletableFuture.completedFuture(blocks); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java index c1ce88398e4..0f968e90839 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java @@ -16,6 +16,9 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask; @@ -53,9 +56,9 @@ public CompletableFuture possibleRequestBodies(final List blo LOG.atDebug() .setMessage("Requesting {} blocks {}->{} ({})") .addArgument(blockHeaders::size) - .addArgument(() -> blockHeaders.get(0).getNumber()) - .addArgument(() -> blockHeaders.get(blockHeaders.size() - 1).getNumber()) - .addArgument(() -> blockHeaders.get(0).getHash().toHexString()) + .addArgument(() -> blockHeaders.getFirst().getNumber()) + .addArgument(() -> blockHeaders.getLast().getNumber()) + .addArgument(() -> blockHeaders.getFirst().getHash().toHexString()) .log(); return requestBodies(blockHeaders) .thenApply(this::saveBlocks) @@ -76,23 +79,47 @@ public CompletableFuture possibleRequestBodies(final List blo @VisibleForTesting protected CompletableFuture> requestBodies(final List blockHeaders) { - final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask = - RetryingGetBlocksFromPeersTask.forHeaders( - context.getProtocolSchedule(), - context.getEthContext(), - context.getMetricsSystem(), - context.getEthContext().getEthPeers().peerCount(), - blockHeaders); + CompletableFuture> blocksFuture; + if (context.getSynchronizerConfiguration().isPeerTaskSystemEnabled()) { + blocksFuture = + context + .getEthContext() + .getScheduler() + .scheduleServiceTask( + () -> { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask( + blockHeaders, + context.getProtocolSchedule(), + context.getEthContext().getEthPeers().peerCount()); + PeerTaskExecutorResult> taskResult = + context.getEthContext().getPeerTaskExecutor().execute(task); + if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && taskResult.result().isPresent()) { + return CompletableFuture.completedFuture(taskResult.result().get()); + } else { + return CompletableFuture.failedFuture( + new RuntimeException(taskResult.responseCode().toString())); + } + }); + } else { + final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask = + RetryingGetBlocksFromPeersTask.forHeaders( + context.getProtocolSchedule(), + context.getEthContext(), + context.getMetricsSystem(), + context.getEthContext().getEthPeers().peerCount(), + blockHeaders); - final CompletableFuture>> run = - getBodiesFromPeerTask.run(); - return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult) - .thenApply( - blocks -> { - LOG.debug("Got {} blocks from peers", blocks.size()); - blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber())); - return blocks; - }); + blocksFuture = + getBodiesFromPeerTask.run().thenApply(AbstractPeerTask.PeerTaskResult::getResult); + } + return blocksFuture.thenApply( + blocks -> { + LOG.debug("Got {} blocks from peers", blocks.size()); + blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber())); + return blocks; + }); } @VisibleForTesting diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 707fc6aa13f..8eebd2ea42d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -140,7 +140,7 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncT final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy); final DownloadBodiesStep downloadBodiesStep = - new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); + new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem); final DownloadReceiptsStep downloadReceiptsStep = new DownloadReceiptsStep(protocolSchedule, ethContext, syncConfig, metricsSystem); final ImportBlocksStep importBlockStep = diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java index 3a0f6edb086..5be4942603f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -35,7 +36,8 @@ public static ChainDownloader create( final SyncState syncState, final MetricsSystem metricsSystem, final SyncTerminationCondition terminationCondition, - final SyncDurationMetrics syncDurationMetrics) { + final SyncDurationMetrics syncDurationMetrics, + final PeerTaskExecutor peerTaskExecutor) { final FullSyncTargetManager syncTargetManager = new FullSyncTargetManager( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java index 570ef303779..109dd42b1a3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java @@ -67,7 +67,8 @@ public FullSyncDownloadPipelineFactory( this.ethContext = ethContext; this.metricsSystem = metricsSystem; this.fullSyncTerminationCondition = syncTerminationCondition; - betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers()); + this.betterSyncTargetEvaluator = + new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers()); } @Override @@ -105,7 +106,7 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncTarget target) final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy); final DownloadBodiesStep downloadBodiesStep = - new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); + new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem); final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep(); final FullImportBlockStep importBlockStep = new FullImportBlockStep( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java index 8f1aca792c3..97bf38c95ec 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; @@ -45,6 +46,7 @@ public FullSyncDownloader( final SyncState syncState, final MetricsSystem metricsSystem, final SyncTerminationCondition terminationCondition, + final PeerTaskExecutor peerTaskExecutor, final SyncDurationMetrics syncDurationMetrics) { this.syncConfig = syncConfig; this.protocolContext = protocolContext; @@ -59,7 +61,8 @@ public FullSyncDownloader( syncState, metricsSystem, terminationCondition, - syncDurationMetrics); + syncDurationMetrics, + peerTaskExecutor); } public CompletableFuture start() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java new file mode 100644 index 00000000000..70e17ca1fc2 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java @@ -0,0 +1,135 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.tasks; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed + * to create the blocks that correspond to the supplied headers. + */ +public class CompleteBlocksWithPeerTask { + private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksWithPeerTask.class); + + private final ProtocolSchedule protocolSchedule; + private final List headersToGet = new ArrayList<>(); + private final PeerTaskExecutor peerTaskExecutor; + + private final Block[] result; + private final int resultSize; + private int nextIndex = 0; + private int remainingBlocks; + + public CompleteBlocksWithPeerTask( + final ProtocolSchedule protocolSchedule, + final List headers, + final PeerTaskExecutor peerTaskExecutor) { + checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list"); + this.protocolSchedule = protocolSchedule; + this.peerTaskExecutor = peerTaskExecutor; + + resultSize = headers.size(); + result = new Block[resultSize]; + remainingBlocks = resultSize; + + for (int i = 0; i < resultSize; i++) { + final BlockHeader header = headers.get(i); + if (BlockHeader.hasEmptyBlock(header)) { + final Block emptyBlock = + new Block(header, createEmptyBodyBasedOnProtocolSchedule(protocolSchedule, header)); + result[i] = emptyBlock; + remainingBlocks--; + } else { + headersToGet.add(header); + } + } + this.nextIndex = findNextIndex(0); + } + + private BlockBody createEmptyBodyBasedOnProtocolSchedule( + final ProtocolSchedule protocolSchedule, final BlockHeader header) { + return new BlockBody( + Collections.emptyList(), + Collections.emptyList(), + isWithdrawalsEnabled(protocolSchedule, header) + ? Optional.of(Collections.emptyList()) + : Optional.empty()); + } + + private boolean isWithdrawalsEnabled( + final ProtocolSchedule protocolSchedule, final BlockHeader header) { + return protocolSchedule.getByBlockHeader(header).getWithdrawalsProcessor().isPresent(); + } + + /** + * Retrieves all remaining blocks from connected peers. Subsequent calls will have no affect. + * + * @return A List of all blocks for the headers supplied when constructing this + * CompleteBlocksWithPeerTask + */ + public List retrieveBlocksFromPeers() { + while (remainingBlocks > 0) { + LOG.atDebug() + .setMessage("Requesting {} bodies from peer") + .addArgument(headersToGet.size()) + .log(); + final GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(headersToGet, protocolSchedule); + final PeerTaskExecutorResult> executionResult = peerTaskExecutor.execute(task); + if (executionResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && executionResult.result().isPresent()) { + final List blockList = executionResult.result().get(); + LOG.atDebug() + .setMessage("Received {} bodies out of {} from peer") + .addArgument(blockList.size()) + .addArgument(headersToGet.size()) + .log(); + blockList.forEach( + block -> { + remainingBlocks--; + result[nextIndex] = block; + headersToGet.removeFirst(); + nextIndex = findNextIndex(nextIndex + 1); + }); + } + } + return List.of(result); + } + + private int findNextIndex(final int startIndex) { + for (int i = startIndex; i < resultSize; i++) { + if (result[i] == null) { + return i; + } + } + return -1; // This only happens when we have finished processing all headers + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 5eee6495ebd..9a8529c033c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -27,11 +27,11 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy; @@ -323,26 +323,44 @@ private CompletableFuture markBadBlock(final BlockHeader badHeader, final Eth // even though the header is known bad we are downloading the block body for the debug_badBlocks // RPC final BadBlockManager badBlockManager = protocolContext.getBadBlockManager(); - return GetBodiesFromPeerTask.forHeaders( - protocolSchedule, ethContext, List.of(badHeader), metricsSystem) - .assignPeer(badPeer) - .run() - .whenComplete( - (blockPeerTaskResult, error) -> { - final HeaderValidationMode validationMode = - validationPolicy.getValidationModeForNextBlock(); - final String description = - String.format("Failed header validation (%s)", validationMode); - final BadBlockCause cause = BadBlockCause.fromValidationFailure(description); - if (blockPeerTaskResult != null) { - final Optional block = blockPeerTaskResult.getResult().stream().findFirst(); - block.ifPresentOrElse( - (b) -> badBlockManager.addBadBlock(b, cause), - () -> badBlockManager.addBadHeader(badHeader, cause)); - } else { - badBlockManager.addBadHeader(badHeader, cause); - } - }); + CompletableFuture blockFuture; + if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { + blockFuture = + ethContext + .getScheduler() + .scheduleServiceTask( + () -> { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(badHeader), protocolSchedule); + PeerTaskExecutorResult> taskResult = + ethContext.getPeerTaskExecutor().executeAgainstPeer(task, badPeer); + if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS) { + return CompletableFuture.completedFuture( + taskResult.result().map(List::getFirst).orElse(null)); + } else { + return CompletableFuture.failedFuture(new RuntimeException()); + } + }); + } else { + blockFuture = + org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask.forHeaders( + protocolSchedule, ethContext, List.of(badHeader), metricsSystem) + .assignPeer(badPeer) + .run() + .thenApply((blockPeerTaskResult) -> blockPeerTaskResult.getResult().getFirst()); + } + return blockFuture.whenComplete( + (blockResult, error) -> { + final HeaderValidationMode validationMode = + validationPolicy.getValidationModeForNextBlock(); + final String description = String.format("Failed header validation (%s)", validationMode); + final BadBlockCause cause = BadBlockCause.fromValidationFailure(description); + if (blockResult != null) { + badBlockManager.addBadBlock(blockResult, cause); + } else { + badBlockManager.addBadHeader(badHeader, cause); + } + }); } private boolean checkHeaderInRange(final BlockHeader header) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java new file mode 100644 index 00000000000..288ddd4ce5a --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java @@ -0,0 +1,189 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.encoding.EncodingContext; +import org.hyperledger.besu.ethereum.core.encoding.TransactionDecoder; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.ChainState; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; +import org.hyperledger.besu.ethereum.eth.messages.EthPV62; +import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.apache.tuweni.bytes.Bytes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class GetBodiesFromPeerTaskTest { + + private static final String FRONTIER_TX_RLP = + "0xf901fc8032830138808080b901ae60056013565b6101918061001d6000396000f35b3360008190555056006001600060e060020a6000350480630a874df61461003a57806341c0e1b514610058578063a02b161e14610066578063dbbdf0831461007757005b610045600435610149565b80600160a060020a031660005260206000f35b610060610161565b60006000f35b6100716004356100d4565b60006000f35b61008560043560243561008b565b60006000f35b600054600160a060020a031632600160a060020a031614156100ac576100b1565b6100d0565b8060018360005260205260406000208190555081600060005260206000a15b5050565b600054600160a060020a031633600160a060020a031614158015610118575033600160a060020a0316600182600052602052604060002054600160a060020a031614155b61012157610126565b610146565b600060018260005260205260406000208190555080600060005260206000a15b50565b60006001826000526020526040600020549050919050565b600054600160a060020a031633600160a060020a0316146101815761018f565b600054600160a060020a0316ff5b561ca0c5689ed1ad124753d54576dfb4b571465a41900a1dff4058d8adf16f752013d0a01221cbd70ec28c94a3b55ec771bcbc70778d6ee0b51ca7ea9514594c861b1884"; + + private static final Transaction TX = + TransactionDecoder.decodeRLP( + new BytesValueRLPInput(Bytes.fromHexString(FRONTIER_TX_RLP), false), + EncodingContext.BLOCK_BODY); + public static final List TRANSACTION_LIST = List.of(TX); + public static final BlockBody BLOCK_BODY = + new BlockBody(TRANSACTION_LIST, Collections.emptyList(), Optional.empty()); + private static ProtocolSchedule protocolSchedule; + + @BeforeAll + public static void setup() { + protocolSchedule = mock(ProtocolSchedule.class); + final ProtocolSpec protocolSpec = mock(ProtocolSpec.class); + when(protocolSpec.isPoS()).thenReturn(true); + when(protocolSchedule.getByBlockHeader(Mockito.any())).thenReturn(protocolSpec); + } + + @Test + public void testGetSubProtocol() { + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(0)), protocolSchedule); + Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol()); + } + + @Test + public void testGetRequestMessage() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask( + List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), protocolSchedule); + + MessageData messageData = task.getRequestMessage(); + GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(messageData); + + Assertions.assertEquals(EthPV62.GET_BLOCK_BODIES, getBlockBodiesMessage.getCode()); + Iterable hashesInMessage = getBlockBodiesMessage.hashes(); + List expectedHashes = + List.of( + Hash.fromHexString(StringUtils.repeat("00", 31) + "11"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "21"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "31")); + List actualHashes = new ArrayList<>(); + hashesInMessage.forEach(actualHashes::add); + + Assertions.assertEquals(3, actualHashes.size()); + Assertions.assertEquals( + expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList()); + } + + @Test + public void testParseResponseWithNullResponseMessage() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> new GetBodiesFromPeerTask(Collections.emptyList(), protocolSchedule)); + } + + @Test + public void testParseResponseForInvalidResponse() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(1)), protocolSchedule); + // body does not match header + BlockBodiesMessage bodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY)); + + Assertions.assertThrows( + InvalidPeerTaskResponseException.class, () -> task.processResponse(bodiesMessage)); + } + + @Test + public void testParseResponse() throws InvalidPeerTaskResponseException { + final BlockHeader nonEmptyBlockHeaderMock = + getNonEmptyBlockHeaderMock(BodyValidation.transactionsRoot(TRANSACTION_LIST).toString()); + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(nonEmptyBlockHeaderMock), protocolSchedule); + + final BlockBodiesMessage blockBodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY)); + + List result = task.processResponse(blockBodiesMessage); + + assertThat(result.size()).isEqualTo(1); + assertThat(result.getFirst().getBody().getTransactions()).isEqualTo(TRANSACTION_LIST); + } + + @Test + public void testGetPeerRequirementFilter() { + BlockHeader blockHeader1 = mockBlockHeader(1); + BlockHeader blockHeader2 = mockBlockHeader(2); + BlockHeader blockHeader3 = mockBlockHeader(3); + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask( + List.of(blockHeader1, blockHeader2, blockHeader3), protocolSchedule); + + EthPeer successfulCandidate = mockPeer(5); + + Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate)); + } + + private BlockHeader mockBlockHeader(final long blockNumber) { + BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); + // second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the + // hash + Mockito.when(blockHeader.getHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + Mockito.when(blockHeader.getBlockHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + Mockito.when(blockHeader.getReceiptsRoot()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "2")); + + return blockHeader; + } + + private static BlockHeader getNonEmptyBlockHeaderMock(final String transactionsRootHexString) { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()) + .thenReturn(Hash.fromHexStringLenient(transactionsRootHexString)); + when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private EthPeer mockPeer(final long chainHeight) { + EthPeer ethPeer = Mockito.mock(EthPeer.class); + ChainState chainState = Mockito.mock(ChainState.class); + + Mockito.when(ethPeer.chainState()).thenReturn(chainState); + Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); + + return ethPeer; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java index 7104ae31730..86e1aaa687c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java @@ -35,6 +35,11 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestBuilder; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -47,6 +52,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.assertj.core.api.Assertions; @@ -70,6 +76,9 @@ public class ForwardSyncStepTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private BackwardSyncContext context; + @Mock private SynchronizerConfiguration syncConfig; + @Mock private PeerTaskExecutor peerTaskExecutor; + private MutableBlockchain remoteBlockchain; private RespondingEthPeer peer; @@ -128,7 +137,12 @@ public void setup() { when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain); when(context.getProtocolSchedule()).thenReturn(protocolSchedule); when(context.getBatchSize()).thenReturn(2); - EthProtocolManager ethProtocolManager = EthProtocolManagerTestBuilder.builder().build(); + when(context.getSynchronizerConfiguration()).thenReturn(syncConfig); + EthProtocolManager ethProtocolManager = + EthProtocolManagerTestBuilder.builder() + .setSynchronizerConfiguration(syncConfig) + .setPeerTaskExecutor(peerTaskExecutor) + .build(); peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); EthContext ethContext = ethProtocolManager.ethContext(); @@ -147,10 +161,28 @@ public void setup() { ForestReferenceTestWorldState.create(Collections.emptyMap()), blockDataGenerator.receipts(block)))); }); + + when(peerTaskExecutor.execute(any(GetBodiesFromPeerTask.class))) + .thenAnswer( + (invocationOnMock) -> { + GetBodiesFromPeerTask task = + invocationOnMock.getArgument(0, GetBodiesFromPeerTask.class); + List blocks = + task.getBlockHeaders().stream() + .map( + (bh) -> + new Block(bh, remoteBlockchain.getBlockBody(bh.getBlockHash()).get())) + .collect(Collectors.toList()); + return new PeerTaskExecutorResult>( + Optional.of(blocks), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.of(peer.getEthPeer())); + }); } @Test public void shouldExecuteForwardSyncWhenPossible() throws Exception { + when(syncConfig.isPeerTaskSystemEnabled()).thenReturn(false); final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 3); ForwardSyncStep step = new ForwardSyncStep(context, backwardChain); @@ -173,6 +205,17 @@ public void shouldExecuteForwardSyncWhenPossible() throws Exception { completableFuture.get(); } + @Test + public void shouldExecuteForwardSyncWhenPossibleUsingPeerTaskSystem() throws Exception { + when(syncConfig.isPeerTaskSystemEnabled()).thenReturn(true); + final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 3); + ForwardSyncStep step = new ForwardSyncStep(context, backwardChain); + + final CompletableFuture completableFuture = step.executeAsync(); + + completableFuture.get(); + } + @Test public void shouldNotRequestWhenNull() { ForwardSyncStep phase = new ForwardSyncStep(context, null); @@ -202,6 +245,20 @@ public void shouldFindBlockWhenRequested() throws Exception { .containsExactlyInAnyOrder(getBlockByNumber(LOCAL_HEIGHT + 1)); } + @Test + public void shouldFindBlockWhenRequestedUsingPeerTaskSystem() throws Exception { + when(syncConfig.isPeerTaskSystemEnabled()).thenReturn(true); + ForwardSyncStep step = + new ForwardSyncStep(context, createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 3)); + + final CompletableFuture> future = + step.requestBodies(List.of(getBlockByNumber(LOCAL_HEIGHT + 1).getHeader())); + final List blocks = future.get(); + Assertions.assertThat(blocks) + .hasSize(1) + .containsExactlyInAnyOrder(getBlockByNumber(LOCAL_HEIGHT + 1)); + } + private BackwardChain createBackwardChain(final int from, final int until) { BackwardChain chain = backwardChainFromBlock(until); for (int i = until; i > from; --i) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java index e73d286fb0b..98a8a26dbb7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.Difficulty; @@ -36,6 +37,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTaskExecutorAnswer; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; @@ -59,6 +61,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -167,6 +170,21 @@ public void setup(final DataStorageFormat dataStorageFormat) { when(peerTaskExecutor.execute(any(GetHeadersFromPeerTask.class))).thenAnswer(getHeadersAnswer); when(peerTaskExecutor.executeAgainstPeer(any(GetHeadersFromPeerTask.class), any(EthPeer.class))) .thenAnswer(getHeadersAnswer); + + Answer>> getBlockBodiesAnswer = + (invocationOnMock) -> { + GetBodiesFromPeerTask task = invocationOnMock.getArgument(0, GetBodiesFromPeerTask.class); + List blocks = + task.getBlockHeaders().stream() + .map((bh) -> new Block(bh, otherBlockchain.getBlockBody(bh.getBlockHash()).get())) + .collect(Collectors.toList()); + return new PeerTaskExecutorResult>( + Optional.of(blocks), PeerTaskExecutorResponseCode.SUCCESS, Optional.empty()); + }; + when(peerTaskExecutor.execute(any(GetBodiesFromPeerTask.class))) + .thenAnswer(getBlockBodiesAnswer); + when(peerTaskExecutor.executeAgainstPeer(any(GetBodiesFromPeerTask.class), any(EthPeer.class))) + .thenAnswer(getBlockBodiesAnswer); } @AfterEach diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java index fb518c305f7..1b3745c5f8f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -28,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -93,7 +95,8 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { syncState, metricsSystem, SyncTerminationCondition.never(), - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java index 2c3c1ecdc8b..4f2577e497a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -35,6 +36,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; @@ -125,7 +127,8 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { syncState, metricsSystem, SyncTerminationCondition.never(), - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java index d79a2209241..55d96acb1c2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -28,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -111,7 +113,8 @@ private ChainDownloader downloader( syncState, metricsSystem, terminalCondition, - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private SynchronizerConfiguration.Builder syncConfigBuilder() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index f50b7edfcff..3953c75882a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -100,6 +100,7 @@ private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig syncState, metricsSystem, SyncTerminationCondition.never(), + null, SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java new file mode 100644 index 00000000000..e2fc9e50744 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java @@ -0,0 +1,253 @@ +/* + * Copyright contributors to Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.tasks; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.mainnet.WithdrawalsProcessor; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class CompleteBlocksWithPeerTaskTest { + + @BeforeAll + public static void setUp() {} + + @Test + public void shouldFailWhenEmptyHeaders() { + assertThatThrownBy(() -> new CompleteBlocksWithPeerTask(null, Collections.emptyList(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Must supply a non-empty headers list"); + } + + @Test + public void shouldReturnEmptyBlock() { + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final BlockHeader blockHeader = getEmptyBlockHeaderMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask(protocolSchedule, List.of(blockHeader), peerTaskExecutor); + final List blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(0).getHeader())).isTrue(); + + verify(peerTaskExecutor, Mockito.never()).execute(any()); + } + + @Test + public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() { + final ProtocolSchedule mockProtocolSchedule = Mockito.mock(ProtocolSchedule.class); + final ProtocolSpec mockParisSpec = Mockito.mock(ProtocolSpec.class); + final ProtocolSpec mockShanghaiSpec = Mockito.mock(ProtocolSpec.class); + final WithdrawalsProcessor mockWithdrawalsProcessor = Mockito.mock(WithdrawalsProcessor.class); + + final BlockHeader header1 = + new BlockHeaderTestFixture().number(1).withdrawalsRoot(null).buildHeader(); + final BlockHeader header2 = + new BlockHeaderTestFixture().number(2).withdrawalsRoot(Hash.EMPTY_TRIE_HASH).buildHeader(); + + when(mockProtocolSchedule.getByBlockHeader((eq(header1)))).thenReturn(mockParisSpec); + when(mockParisSpec.getWithdrawalsProcessor()).thenReturn(Optional.empty()); + when(mockProtocolSchedule.getByBlockHeader((eq(header2)))).thenReturn(mockShanghaiSpec); + when(mockShanghaiSpec.getWithdrawalsProcessor()) + .thenReturn(Optional.of(mockWithdrawalsProcessor)); + + final List expectedBlocks = getExpectedBlocks(header1, header2); + + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(expectedBlocks), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.empty())); + + final CompleteBlocksWithPeerTask task = + new CompleteBlocksWithPeerTask( + mockProtocolSchedule, asList(header1, header2), peerTaskExecutor); + final List blocks = task.retrieveBlocksFromPeers(); + + assertThat(blocks).isEqualTo(expectedBlocks); + } + + @Test + public void shouldReturnNonEmptyBlock() { + final Block block = mock(Block.class); + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + final BlockHeader nonEmptyBlockHeaderMock = getNonEmptyBlockHeaderMock("0x01", "0x02"); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block)), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.empty())); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, List.of(nonEmptyBlockHeaderMock), peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(1); + assertThat(blocks.get(0)).isEqualTo(block); + } + + @Test + public void shouldReturnBlocksInRightOrderWhenEmptyAndNonEmptyBlocksRequested() { + final Block block1 = mock(Block.class); + final Block block3 = mock(Block.class); + final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock(); + final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02"); + final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04"); + + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block1, block3)), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.empty())); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, + List.of( + nonEmptyBlockHeaderMock1, + emptyBlockHeaderMock, + nonEmptyBlockHeaderMock3, + emptyBlockHeaderMock), + peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(4); + assertThat(blocks.get(0)).isEqualTo(block1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(1).getHeader())).isTrue(); + assertThat(blocks.get(2)).isEqualTo(block3); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(3).getHeader())).isTrue(); + } + + @Test + public void shouldRequestMoreBodiesUntilFinished() { + final Block block1 = mock(Block.class); + final Block block3 = mock(Block.class); + final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock(); + final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02"); + final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04"); + + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block1)), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.empty())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block3)), + PeerTaskExecutorResponseCode.SUCCESS, + Optional.empty())); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, + List.of( + nonEmptyBlockHeaderMock1, + emptyBlockHeaderMock, + nonEmptyBlockHeaderMock3, + emptyBlockHeaderMock), + peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(4); + assertThat(blocks.get(0)).isEqualTo(block1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(1).getHeader())).isTrue(); + assertThat(blocks.get(2)).isEqualTo(block3); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(3).getHeader())).isTrue(); + } + + private static ProtocolSchedule getProtocolScheduleMock() { + final ProtocolSchedule protocolSchedule = mock(ProtocolSchedule.class); + final ProtocolSpec protocolSpec = mock(ProtocolSpec.class); + final Optional optional = Optional.of(mock(WithdrawalsProcessor.class)); + when(protocolSpec.getWithdrawalsProcessor()).thenReturn(optional); + when(protocolSchedule.getByBlockHeader(any())).thenReturn(protocolSpec); + return protocolSchedule; + } + + private static BlockHeader getEmptyBlockHeaderMock() { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()).thenReturn(Hash.EMPTY_TRIE_HASH); + when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private static BlockHeader getNonEmptyBlockHeaderMock( + final String transactionsRootHexString, final String ommersHash) { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()) + .thenReturn(Hash.fromHexStringLenient(transactionsRootHexString)); + when(blockHeader.getOmmersHash()).thenReturn(Hash.fromHexStringLenient(ommersHash)); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private static List getExpectedBlocks( + final BlockHeader header1, final BlockHeader header2) { + final Block block1 = + new Block( + header1, + new BlockBody(Collections.emptyList(), Collections.emptyList(), Optional.empty())); + final Block block2 = + new Block( + header2, + new BlockBody( + Collections.emptyList(), + Collections.emptyList(), + Optional.of(Collections.emptyList()))); + + return asList(block1, block2); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java index f594942afac..ffda7217bb3 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java @@ -26,12 +26,14 @@ import org.hyperledger.besu.ethereum.chain.BadBlockManager; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; @@ -303,6 +305,26 @@ public void marksBadBlockWhenHeaderValidationFailsUsingPeerTaskSystem() { Optional.of(respondingPeer.getEthPeer())); }); + Mockito.when( + peerTaskExecutor.executeAgainstPeer( + Mockito.any(GetBodiesFromPeerTask.class), Mockito.any(EthPeer.class))) + .thenAnswer( + (invocationOnMock) -> { + GetBodiesFromPeerTask task = + invocationOnMock.getArgument(0, GetBodiesFromPeerTask.class); + EthPeer peer = invocationOnMock.getArgument(1, EthPeer.class); + List blocks = + task.getBlockHeaders().stream() + .map( + (blockHeader) -> + new Block( + blockHeader, + blockchain.getBlockBody(blockHeader.getBlockHash()).get())) + .toList(); + return new PeerTaskExecutorResult>( + Optional.of(blocks), PeerTaskExecutorResponseCode.SUCCESS, Optional.of(peer)); + }); + // Execute the task final BlockHeader referenceHeader = chain.get(blockCount - 1).getHeader(); final EthTask> task =