Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some mark sweep pruner bugs where nodes that should be kept were being swept (Re-merge of #38) #50

Merged
merged 23 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ allprojects {
}

// Below this line are currently only license header tasks
format 'groovy', {
target '**/src/*/grovy/**/*.groovy'
}
format 'groovy', { target '**/src/*/grovy/**/*.groovy' }

// Currently disabled due to referencetest issues
// format 'bash', {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.worldstate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;

import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.WorldState;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.util.bytes.Bytes32;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Test;

public class PrunerIntegrationTest {

private final BlockDataGenerator gen = new BlockDataGenerator();
private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Map<BytesValue, byte[]> hashValueStore = new HashMap<>();
private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore);
private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
private final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);

@Test
public void pruner_smallState_manyOpsPerTx() throws InterruptedException {
testPruner(3, 1, 1, 4, 1000);
}

@Test
public void pruner_largeState_fewOpsPerTx() throws InterruptedException {
testPruner(2, 5, 5, 6, 5);
}

@Test
public void pruner_emptyBlocks() throws InterruptedException {
testPruner(5, 0, 2, 5, 10);
}

@Test
public void pruner_markChainhead() throws InterruptedException {
testPruner(4, 2, 1, 10, 20);
}

@Test
public void pruner_lowRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 1, 4, 20);
}

@Test
public void pruner_highRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 9, 10, 20);
}

private void testPruner(
final int numCycles,
final int accountsPerBlock,
final long blockConfirmations,
final int numBlocksToKeep,
final int opsPerTransaction)
throws InterruptedException {

final var markSweepPruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final var pruner =
new Pruner(
markSweepPruner,
blockchain,
new MockExecutorService(),
new PruningConfiguration(blockConfirmations, numBlocksToKeep));

pruner.start();

for (int cycle = 0; cycle < numCycles; ++cycle) {
int numBlockInCycle =
numBlocksToKeep
+ 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle
var fullyMarkedBlockNum = cycle * numBlockInCycle + 1;

// This should cause a full mark and sweep cycle
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);

// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>();
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}

if (accountsPerBlock != 0) {
assertThat(hashValueStore.size())
.isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check
}

// Assert that blocks from mark point onward are still accessible
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts =
markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}

// All other state roots should have been removed
for (int i = 0; i < fullyMarkedBlockNum; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i).get();
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}

// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values())
.containsExactlyInAnyOrderElementsOf(
expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet()));
}

pruner.stop();
}

private void generateBlockchainData(final int numBlocks, final int numAccounts) {
Block parentBlock = blockchain.getChainHeadBlock();
for (int i = 0; i < numBlocks; i++) {
final MutableWorldState worldState =
worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get();
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts);
final Hash stateRoot = worldState.rootHash();

final Block block =
gen.block(
BlockOptions.create()
.setStateRoot(stateRoot)
.setBlockNumber(parentBlock.getHeader().getNumber() + 1L)
.setParentHash(parentBlock.getHash()));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
parentBlock = block;
}
}

private Set<BytesValue> collectWorldStateNodes(
final Hash stateRootHash, final Set<BytesValue> collector) {
final List<Hash> storageRoots = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, BytesValue> stateTrie = createStateTrie(stateRootHash);

// Collect storage roots and code
stateTrie
.entriesFrom(Bytes32.ZERO, 1000)
.forEach(
(key, val) -> {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(val));
stateStorage
.get(accountValue.getCodeHash().getArrayUnsafe())
.ifPresent(v -> collector.add(BytesValue.wrap(v)));
storageRoots.add(accountValue.getStorageRoot());
});

// Collect state nodes
collectTrieNodes(stateTrie, collector);
// Collect storage nodes
for (Hash storageRoot : storageRoots) {
final MerklePatriciaTrie<Bytes32, BytesValue> storageTrie = createStorageTrie(storageRoot);
collectTrieNodes(storageTrie, collector);
}

return collector;
}

private void collectTrieNodes(
final MerklePatriciaTrie<Bytes32, BytesValue> trie, final Set<BytesValue> collector) {
final Bytes32 rootHash = trie.getRootHash();
trie.visitAll(
(node) -> {
if (node.isReferencedByHash() || node.getHash().equals(rootHash)) {
collector.add(node.getRlp());
}
});
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

// Proxy class so that we have access to the constructor that takes our own map
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {

public TestInMemoryStorage(final Map<BytesValue, byte[]> hashValueStore) {
super(hashValueStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
Expand Down Expand Up @@ -98,7 +98,9 @@ public MarkSweepPruner(

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
markStorage.clear();
pendingMarks.clear();
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

public void cleanup() {
Expand All @@ -107,7 +109,6 @@ public void cleanup() {

public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
Expand All @@ -119,13 +120,12 @@ public void mark(final Hash rootHash) {
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
LOG.debug("Completed marking used nodes for pruning");
}

public void sweepBefore(final long markedBlockNumber) {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
LOG.debug("Sweeping unused nodes");
// Sweep state roots first, walking backwards until we get to a state root that isn't in the
// storage
long prunedNodeCount = 0;
Expand All @@ -138,7 +138,7 @@ public void sweepBefore(final long markedBlockNumber) {
break;
}

if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) {
if (!isMarked(candidateStateRootHash)) {
updater.removeAccountStateTrieNode(candidateStateRootHash);
prunedNodeCount++;
if (prunedNodeCount % operationsPerTransaction == 0) {
Expand All @@ -149,11 +149,19 @@ public void sweepBefore(final long markedBlockNumber) {
}
updater.commit();
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(markStorage::containsKey);
prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
LOG.debug("Completed sweeping unused nodes");
}

private boolean isMarked(final Bytes32 key) {
return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe());
}

private boolean isMarked(final byte[] key) {
return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key);
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
Expand Down Expand Up @@ -182,10 +190,14 @@ private void processAccountState(final BytesValue value) {

@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markNodes(Collections.singleton(hash));
}

private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.add(hash);
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
Expand All @@ -209,15 +221,4 @@ void flushPendingMarks() {
markLock.unlock();
}
}

private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}
Loading