Skip to content

Commit

Permalink
Add trie log pruning triggered after trie log persist
Browse files Browse the repository at this point in the history
TrieLogPruner loads all trie logs on startup.
Each time a trie log is persisted, the pruner is run and uses a pruning window (currently hardcoded to 1000) to chip away at an initially large backlog of trie logs
Once backlog is cleared, each prune run should just be a single trie log.

Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
  • Loading branch information
siladu committed Oct 12, 2023
1 parent 4276a40 commit 757ab62
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -203,6 +204,10 @@ public Optional<byte[]> getTrieLog(final Hash blockHash) {
return trieLogStorage.get(blockHash.toArrayUnsafe());
}

public Stream<byte[]> streamTrieLogs() {
return trieLogStorage.streamKeys();
}

public Optional<Bytes> getStateTrieNode(final Bytes location) {
return composedWorldStateStorage
.get(TRIE_BRANCH_STORAGE, location.toArrayUnsafe())
Expand Down Expand Up @@ -335,6 +340,10 @@ public long prune(final Predicate<byte[]> inUseCheck) {
throw new RuntimeException("Bonsai Tries do not work with pruning.");
}

public boolean pruneTrieLog(final byte[] blockHashBytes) {
return trieLogStorage.tryDelete(blockHashBytes);
}

@Override
public long addNodeAddedListener(final NodesAddedListener listener) {
throw new RuntimeException("addNodeAddedListener not available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class AbstractTrieLogManager implements TrieLogManager {
protected final Subscribers<TrieLogObserver> trieLogObservers = Subscribers.create();

protected final TrieLogFactory trieLogFactory;
private final TrieLogPruner trieLogPruner;

protected AbstractTrieLogManager(
final Blockchain blockchain,
Expand All @@ -61,6 +62,7 @@ protected AbstractTrieLogManager(
this.cachedWorldStatesByHash = cachedWorldStatesByHash;
this.maxLayersToLoad = maxLayersToLoad;
this.trieLogFactory = setupTrieLogFactory(pluginContext);
this.trieLogPruner = new TrieLogPruner(worldStateStorage, blockchain);
}

protected abstract TrieLogFactory setupTrieLogFactory(final BesuContext pluginContext);
Expand Down Expand Up @@ -88,6 +90,9 @@ public synchronized void saveTrieLog(
} finally {
if (success) {
stateUpdater.commit();
trieLogPruner.rememberTrieLogKeyForPruning(
forBlockHeader.getNumber(), forBlockHeader.getBlockHash().toArrayUnsafe());
trieLogPruner.prune();
} else {
stateUpdater.rollback();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.bonsai.trielog;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TrieLogPruner {

private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);

// Blocks != TrieLogLayers due to forks
private static final int DEFAULT_MAX_BLOCKS_TO_PRUNE = 1000;
private final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final long numBlocksToRetain;
private final int pruningWindowSize;

public TrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage, final Blockchain blockchain) {
this(
rootWorldStateStorage,
blockchain,
AbstractTrieLogManager.RETAINED_LAYERS,
DEFAULT_MAX_BLOCKS_TO_PRUNE);
}

@VisibleForTesting
TrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final long numBlocksToRetain,
final int pruningWindowSize) {
this.rootWorldStateStorage = rootWorldStateStorage;
this.blockchain = blockchain;
this.numBlocksToRetain = numBlocksToRetain;
this.pruningWindowSize = pruningWindowSize;
loadTrieLogs();
}

void loadTrieLogs() {
LOG.atInfo().log("Loading trie logs from database...");
final Stream<byte[]> trieLogs = rootWorldStateStorage.streamTrieLogs();
final AtomicLong count = new AtomicLong();
trieLogs.forEach(
hashAsBytes -> {
Hash hash = Hash.wrap(Bytes32.wrap(hashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(hash);
if (header.isPresent()) {
knownTrieLogKeysByDescendingBlockNumber.put(header.get().getNumber(), hashAsBytes);
count.getAndIncrement();
}
});
LOG.atInfo().log("Loaded {} trie logs from database", count);
}

private static final Multimap<Long, byte[]> knownTrieLogKeysByDescendingBlockNumber =
TreeMultimap.create(Comparator.reverseOrder(), Comparator.comparingInt(Arrays::hashCode));

void rememberTrieLogKeyForPruning(final long blockNumber, final byte[] trieLogKey) {
knownTrieLogKeysByDescendingBlockNumber.put(blockNumber, trieLogKey);
}

void prune() {
final long retainAboveThisBlock = blockchain.getChainHeadBlockNumber() - numBlocksToRetain;
LOG.atDebug()
.setMessage("(chainHeadNumber: {} - numBlocksToRetain: {}) = retainAboveThisBlock: {}")
.addArgument(blockchain.getChainHeadBlockNumber())
.addArgument(numBlocksToRetain)
.addArgument(retainAboveThisBlock)
.log();

final var pruneWindowEntries =
knownTrieLogKeysByDescendingBlockNumber.asMap().entrySet().stream()
.dropWhile((e) -> e.getKey() > retainAboveThisBlock)
.limit(pruningWindowSize);

final List<Long> blockNumbersToRemove = new ArrayList<>();

final AtomicInteger count = new AtomicInteger();
pruneWindowEntries.forEach(
(e) -> {
for (byte[] trieLogKey : e.getValue()) {
rootWorldStateStorage.pruneTrieLog(trieLogKey);
count.getAndIncrement();
}
blockNumbersToRemove.add(e.getKey());
});

blockNumbersToRemove.forEach(knownTrieLogKeysByDescendingBlockNumber::removeAll);
LOG.atTrace()
.setMessage("pruned {} trie logs for blocks {}")
.addArgument(count)
.addArgument(blockNumbersToRemove)
.log();
LOG.atDebug()
.setMessage("pruned {} trie logs from {} blocks")
.addArgument(count)
.addArgument(blockNumbersToRemove.size())
.log();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.bonsai.trielog;

import static org.mockito.Mockito.times;

import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.chain.Blockchain;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

public class TrieLogPrunerTest {

private BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private Blockchain blockchain;

@BeforeEach
public void setup() {
rootWorldStateStorage = Mockito.mock(BonsaiWorldStateKeyValueStorage.class);
blockchain = Mockito.mock(Blockchain.class);
}

@SuppressWarnings("BannedMethod")
@Test
public void trieLogs_pruned_in_reverse_order_within_pruning_window() {
Configurator.setLevel(LogManager.getLogger(TrieLogPruner.class).getName(), Level.TRACE);

// Given

// pruning window is below numBlocksToRetain and inside the pruningWindowSize offset.
final long blocksToRetain = 3;
final int pruningWindowSize = 2;
TrieLogPruner trieLogPruner =
new TrieLogPruner(rootWorldStateStorage, blockchain, blocksToRetain, pruningWindowSize);

final byte[] key0 = new byte[] {1, 2, 3}; // older block outside the prune window
final byte[] key1 = new byte[] {1, 2, 3}; // block inside the prune window
final byte[] key2 = new byte[] {4, 5, 6}; // same block (fork)
final byte[] key3 = new byte[] {7, 8, 9}; // different block inside the prune window
final byte[] key4 = new byte[] {10, 11, 12}; // retained block
final byte[] key5 = new byte[] {13, 14, 15}; // different retained block
final byte[] key6 = new byte[] {7, 8, 9}; // another retained block
final long block0 = 1000L;
final long block1 = 1001L;
final long block2 = 1002L;
final long block3 = 1003L;
final long block4 = 1004L;
final long block5 = 1005L;

trieLogPruner.rememberTrieLogKeyForPruning(block0, key0); // older block outside prune window
trieLogPruner.rememberTrieLogKeyForPruning(block1, key1); // block inside the prune window
trieLogPruner.rememberTrieLogKeyForPruning(block1, key2); // same block number (fork)
trieLogPruner.rememberTrieLogKeyForPruning(block2, key3); // different block inside prune window
trieLogPruner.rememberTrieLogKeyForPruning(block3, key4); // retained block
trieLogPruner.rememberTrieLogKeyForPruning(block4, key5); // different retained block
trieLogPruner.rememberTrieLogKeyForPruning(block5, key6); // another retained block

Mockito.when(blockchain.getChainHeadBlockNumber()).thenReturn(block5);

// When
trieLogPruner.prune();

// Then
InOrder inOrder = Mockito.inOrder(rootWorldStateStorage);
inOrder.verify(rootWorldStateStorage, times(1)).pruneTrieLog(key3);
inOrder.verify(rootWorldStateStorage, times(1)).pruneTrieLog(key1);
inOrder.verify(rootWorldStateStorage, times(1)).pruneTrieLog(key2);

// Subsequent run should add one more block, then prune two oldest remaining keys
long block6 = 1006L;
trieLogPruner.rememberTrieLogKeyForPruning(block6, new byte[] {1, 2, 3});
Mockito.when(blockchain.getChainHeadBlockNumber()).thenReturn(block6);

trieLogPruner.prune();

inOrder.verify(rootWorldStateStorage, times(1)).pruneTrieLog(key4);
inOrder.verify(rootWorldStateStorage, times(1)).pruneTrieLog(key0);
}
}

0 comments on commit 757ab62

Please sign in to comment.