Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Use pipeline for world state download #1096

Merged
merged 76 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
a6ba7bf
Add a basic pipeline framework.
ajsutton Mar 6, 2019
2a75bd9
Support introducing batches into the pipeline.
ajsutton Mar 6, 2019
1eecb21
Make buffer size configurable.
ajsutton Mar 6, 2019
fa06ace
Add async processing and parallel processing support to pipelines.
ajsutton Mar 7, 2019
7fa7c59
Extract common looping code.
ajsutton Mar 8, 2019
91b53ca
Introduce finalize step to make InputOutputStep more useful.
ajsutton Mar 8, 2019
a3970c1
Move AsyncProcessStage over to SingleStepStage.
ajsutton Mar 8, 2019
c9b23f1
Reduce duplication.
ajsutton Mar 8, 2019
2ed23fc
Add tests for flat map.
ajsutton Mar 8, 2019
2f2ce8a
Fix issue where output pipe from a parallel processing stage was clos…
ajsutton Mar 8, 2019
ea8e0c4
Support aborting pipelines.
ajsutton Mar 8, 2019
55ccbcc
Abort the pipeline if any stage throws an unhandled exception.
ajsutton Mar 8, 2019
6811215
Tidy up access levels.
ajsutton Mar 8, 2019
02e4aa1
Add some javadoc.
ajsutton Mar 8, 2019
ebf0feb
Exceptions cause the pipeline to abort quite violently so we can't as…
ajsutton Mar 8, 2019
be717e0
Add missing javadoc parameters.
ajsutton Mar 8, 2019
4b8c544
Don't use a static executor service.
ajsutton Mar 8, 2019
8181abb
Simplify test & fix spotless.
ajsutton Mar 8, 2019
7b9961f
Ignore PipelineBuilderTest to see if that's what's causing the build …
ajsutton Mar 8, 2019
6a78181
Try to find the one test case that's stalling.
ajsutton Mar 11, 2019
e3f0f84
Unignore some more tests.
ajsutton Mar 11, 2019
1e3a6b0
Unignore one more test.
ajsutton Mar 11, 2019
0a4f8cf
Unignore final test.
ajsutton Mar 11, 2019
e92fce2
Synchronize on overallFuture.
ajsutton Mar 11, 2019
f12e681
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
3515376
Add more logging.
ajsutton Mar 11, 2019
c9e671e
Remove unused executor.
ajsutton Mar 11, 2019
5f9899a
Don't wait forever for vertx services to shutdown. Log warnings if s…
ajsutton Mar 11, 2019
da999d6
Merge branch 'runner-fix' into pipeline-framework
ajsutton Mar 11, 2019
38b3b66
Revert "Add more logging."
ajsutton Mar 11, 2019
3d8c447
Rename PipelineSource to ProducingStage.
ajsutton Mar 11, 2019
088838d
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
ca65957
Fix race condition when aborting.
ajsutton Mar 11, 2019
803784e
Spotless.
ajsutton Mar 11, 2019
7c799c0
Add javadoc.
ajsutton Mar 11, 2019
824867b
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 11, 2019
dc87314
More clearly separate stages (which are runnable) from processors (wh…
ajsutton Mar 11, 2019
714d62c
Add a counter to track metrics for each stage output count.
ajsutton Mar 12, 2019
6b2cfb9
Put the metrics in the pipe not the stage.
ajsutton Mar 12, 2019
4cf7cca
Abort the pipeline when the returned future is cancelled.
ajsutton Mar 12, 2019
f8df301
Rename InputPipe to ReadPipe and OutputPipe to WritePipe to avoid the…
ajsutton Mar 12, 2019
9d0be66
Make Pipeline constructor package private.
ajsutton Mar 12, 2019
30d2893
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 12, 2019
5b336aa
Check batch size is greater than 0. Pipe's don't have remaining capa…
ajsutton Mar 12, 2019
589c109
Add RetryingGetNodeDataFromPeer.
ajsutton Mar 7, 2019
62ce997
Switch WorldStateDownloader to use pipeline.
ajsutton Mar 7, 2019
a0aac9e
Only count a request as not having made progress if it returned empty…
ajsutton Mar 8, 2019
c24bca6
Mostly bad stuff. Split into two pipelines so data available locally…
ajsutton Mar 11, 2019
df7330b
Avoid spamming logs when the pipeline is aborted.
ajsutton Mar 11, 2019
d92df5d
Introduce WorldStateDownloadProcess to encapsulate the creation and m…
ajsutton Mar 12, 2019
103f81f
Use clearer step names for metrics.
ajsutton Mar 12, 2019
942ed69
Report inflight task count and remove them from the outstanding list …
ajsutton Mar 12, 2019
6cac5cc
Add pipe stage name to thread while executing to make it easier to id…
ajsutton Mar 12, 2019
25b9bb1
Combine enqueueChildren with markComplete step since they both want a…
ajsutton Mar 12, 2019
a61b629
Fix sync config.
ajsutton Mar 12, 2019
ac9600a
Don't delete completed tasks from the database and skip initing from …
ajsutton Mar 13, 2019
8f3b5db
Ensure threads waiting for new items in the queue are notified after …
ajsutton Mar 13, 2019
8b06584
Only propagate unexpected errors from the pipeline to the download st…
ajsutton Mar 13, 2019
03f420d
Ensure threads waiting for dequeueBlocking are notified when the down…
ajsutton Mar 13, 2019
b6d9795
Ensure the completion pipeline input is closed when the fetch data pi…
ajsutton Mar 13, 2019
9720086
Get tests passing.
ajsutton Mar 13, 2019
1c002f4
Remove tests that expect RocksDB task queue to be able to resume.
ajsutton Mar 13, 2019
e0ca47a
Fix javadoc.
ajsutton Mar 13, 2019
2ce3dc1
Add tests.
ajsutton Mar 13, 2019
4da94c8
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 13, 2019
e901457
Fix CompleteTaskStepTest.
ajsutton Mar 13, 2019
1b097a8
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 14, 2019
6023804
Support flat mapping with multiple threads.
ajsutton Mar 14, 2019
e51a4a1
Use multiple threads to check if local data is available.
ajsutton Mar 14, 2019
c7f99e2
Add missing param.
ajsutton Mar 14, 2019
8179869
Add tests for RetryingMessageTask.
ajsutton Mar 14, 2019
2098c84
Add test for RequestDataStep.
ajsutton Mar 14, 2019
b710e99
Add tests for RequestDataStep.
ajsutton Mar 14, 2019
48bcd45
Merge branch 'master' of github.com:PegaSysEng/pantheon into pipeline…
ajsutton Mar 14, 2019
30c7d9d
Add comment explaining why CancellationException is not propagated.
ajsutton Mar 14, 2019
960c9cf
Update tests based on feedback.
ajsutton Mar 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ethereum/eth/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ethereum:permissioning')
implementation project(':metrics')
implementation project(':services:kvstore')
implementation project(':services:pipeline')
implementation project(':services:tasks')

implementation 'io.vertx:vertx-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand Down Expand Up @@ -117,6 +118,13 @@ public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return serviceFuture;
}

public CompletableFuture<Void> startPipeline(final Pipeline pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
serviceFutures.add(pipelineFuture);
pipelineFuture.whenComplete((r, t) -> serviceFutures.remove(pipelineFuture));
return pipelineFuture;
}

public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
return CompletableFuture.supplyAsync(computation, computationExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public class RetryingGetNodeDataFromPeerTask
extends AbstractRetryingPeerTask<Map<Hash, BytesValue>> {

private final EthContext ethContext;
private final Set<Hash> hashes;
private final long pivotBlockNumber;
private final MetricsSystem metricsSystem;

private RetryingGetNodeDataFromPeerTask(
final EthContext ethContext,
final Collection<Hash> hashes,
final long pivotBlockNumber,
final MetricsSystem metricsSystem) {
super(ethContext, 3, data -> false, metricsSystem);
this.ethContext = ethContext;
this.hashes = new HashSet<>(hashes);
this.pivotBlockNumber = pivotBlockNumber;
this.metricsSystem = metricsSystem;
}

public static RetryingGetNodeDataFromPeerTask forHashes(
final EthContext ethContext,
final Collection<Hash> hashes,
final long pivotBlockNumber,
final MetricsSystem metricsSystem) {
return new RetryingGetNodeDataFromPeerTask(ethContext, hashes, pivotBlockNumber, metricsSystem);
}

@Override
protected CompletableFuture<Map<Hash, BytesValue>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
final GetNodeDataFromPeerTask task =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, pivotBlockNumber, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.get().complete(peerResult.getResult());
return peerResult.getResult();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.Task;

public class CompleteTaskStep {

private final WorldStateStorage worldStateStorage;
private final Counter completedRequestsCounter;
private final Counter retriedRequestsCounter;

public CompleteTaskStep(
final WorldStateStorage worldStateStorage, final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;

completedRequestsCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_completed_requests_total",
"Total number of node data requests completed as part of fast sync world state download");
retriedRequestsCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_retried_requests_total",
"Total number of node data requests repeated as part of fast sync world state download");
}

public void markAsCompleteOrFailed(
final BlockHeader header,
final WorldDownloadState downloadState,
final Task<NodeDataRequest> task) {
if (task.getData().getData() != null) {
enqueueChildren(task, header, downloadState);
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(worldStateStorage, header);
} else {
retriedRequestsCounter.inc();
task.markFailed();
// Marking the task as failed will add it back to the queue so make sure any threads
// waiting to read from the queue are notified.
downloadState.notifyTaskAvailable();
}
}

private void enqueueChildren(
final Task<NodeDataRequest> task,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final NodeDataRequest request = task.getData();
// Only queue rootnode children if we started from scratch
if (!downloadState.downloadWasResumed() || !isRootState(blockHeader, request)) {
downloadState.enqueueRequests(request.getChildRequests());
}
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipe;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

public class LoadLocalDataStep {

private final WorldStateStorage worldStateStorage;
private final Counter existingNodeCounter;

public LoadLocalDataStep(
final WorldStateStorage worldStateStorage, final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;
existingNodeCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"world_state_existing_nodes_total",
"Total number of node data requests completed using existing data");
}

public Stream<Task<NodeDataRequest>> loadLocalData(
final Task<NodeDataRequest> task, final Pipe<Task<NodeDataRequest>> completedTasks) {
final NodeDataRequest request = task.getData();
final Optional<BytesValue> existingData = worldStateStorage.getNodeData(request.getHash());
if (existingData.isPresent()) {
existingNodeCounter.inc();
request.setData(existingData.get());
request.setRequiresPersisting(false);
completedTasks.put(task);
return Stream.empty();
}
return Stream.of(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.services.tasks.Task;

import java.util.List;

public class PersistDataStep {
private final WorldStateStorage worldStateStorage;

public PersistDataStep(final WorldStateStorage worldStateStorage) {
this.worldStateStorage = worldStateStorage;
}

public List<Task<NodeDataRequest>> persist(
final List<Task<NodeDataRequest>> tasks,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final Updater updater = worldStateStorage.updater();
tasks.stream()
.map(Task::getData)
.filter(request -> request.getData() != null)
.forEach(
request -> {
if (isRootState(blockHeader, request)) {
downloadState.setRootNodeData(request.getData());
} else {
request.persist(updater);
}
});
updater.commit();
return tasks;
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.RetryingGetNodeDataFromPeerTask;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class RequestDataStep {
private static final Logger LOG = LogManager.getLogger();
private final BiFunction<List<Hash>, Long, EthTask<Map<Hash, BytesValue>>> getNodeDataTaskFactory;

public RequestDataStep(final EthContext ethContext, final MetricsSystem metricsSystem) {
this(
(hashes, pivotBlockNumber) ->
RetryingGetNodeDataFromPeerTask.forHashes(
ethContext, hashes, pivotBlockNumber, metricsSystem));
}

RequestDataStep(
final BiFunction<List<Hash>, Long, EthTask<Map<Hash, BytesValue>>> getNodeDataTaskFactory) {
this.getNodeDataTaskFactory = getNodeDataTaskFactory;
}

public CompletableFuture<List<Task<NodeDataRequest>>> requestData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final WorldDownloadState downloadState) {
final List<Hash> hashes =
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
return sendRequest(blockHeader, hashes, downloadState)
.thenApply(
data -> {
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
if (matchingData != null) {
request.setData(matchingData);
}
}
return requestTasks;
});
}

private CompletableFuture<Map<Hash, BytesValue>> sendRequest(
final BlockHeader blockHeader,
final List<Hash> hashes,
final WorldDownloadState downloadState) {
final EthTask<Map<Hash, BytesValue>> task =
getNodeDataTaskFactory.apply(hashes, blockHeader.getNumber());
downloadState.addOutstandingTask(task);
return task.run()
.handle(
(result, error) -> {
downloadState.removeOutstandingTask(task);
if (error != null) {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (!(rootCause instanceof TimeoutException
|| rootCause instanceof InterruptedException
|| rootCause instanceof CancellationException
|| rootCause instanceof EthTaskException)) {
LOG.debug("GetNodeDataRequest failed", error);
}
return Collections.emptyMap();
}
downloadState.requestComplete(!result.isEmpty());
return result;
});
}
}
Loading