From ec8429f800845f2af0728161119f77762f97f219 Mon Sep 17 00:00:00 2001 From: Simon Dudley Date: Tue, 30 Jul 2024 16:51:06 +1000 Subject: [PATCH] Refactor TrieLogPruner preload timeout to be more testable (#7393) Also update logging Signed-off-by: Simon Dudley --- .../common/trielog/TrieLogPruner.java | 62 ++++++++++--------- .../trielog/TrieLogPrunerTest.java | 53 ++++++++++++++-- 2 files changed, 82 insertions(+), 33 deletions(-) rename ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/{bonsai => common}/trielog/TrieLogPrunerTest.java (87%) diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPruner.java index 98bc4246ebe..02ff553fe11 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPruner.java @@ -26,16 +26,17 @@ import java.util.Comparator; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.TreeMultimap; @@ -91,43 +92,44 @@ public TrieLogPruner( } public void initialize() { - preloadQueueWithTimeout(); + preloadQueueWithTimeout(PRELOAD_TIMEOUT_IN_SECONDS); } - private void preloadQueueWithTimeout() { + @VisibleForTesting + void preloadQueueWithTimeout(final int timeoutInSeconds) { + LOG.info("Trie log pruner queue preload starting..."); LOG.atInfo() .setMessage("Attempting to load first {} trie logs from database...") .addArgument(loadingLimit) .log(); - try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) { + try (final ExecutorService preloadExecutor = Executors.newSingleThreadExecutor()) { + final Future future = preloadExecutor.submit(this::preloadQueue); - final AtomicBoolean timeoutOccurred = new AtomicBoolean(false); - final Runnable timeoutTask = - () -> { - timeoutOccurred.set(true); - LOG.atWarn() - .setMessage( - "Timeout occurred while loading and processing {} trie logs from database") - .addArgument(loadingLimit) - .log(); - }; - - final ScheduledFuture timeoutFuture = - preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); LOG.atInfo() .setMessage( "Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs") - .addArgument(PRELOAD_TIMEOUT_IN_SECONDS) + .addArgument(timeoutInSeconds) .log(); - preloadQueue(timeoutOccurred, timeoutFuture); + try { + future.get(timeoutInSeconds, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error loading trie logs from database", e); + future.cancel(true); + } catch (TimeoutException e) { + future.cancel(true); + LOG.atWarn() + .setMessage("Timeout occurred while loading and processing {} trie logs from database") + .addArgument(loadingLimit) + .log(); + } } + LOG.info("Trie log pruner queue preload complete."); } - private void preloadQueue( - final AtomicBoolean timeoutOccurred, final ScheduledFuture timeoutFuture) { + private void preloadQueue() { try (final Stream trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) { @@ -135,9 +137,9 @@ private void preloadQueue( final AtomicLong orphansPruned = new AtomicLong(); trieLogKeys.forEach( blockHashAsBytes -> { - if (timeoutOccurred.get()) { + if (Thread.currentThread().isInterrupted()) { throw new RuntimeException( - new TimeoutException("Timeout occurred while preloading trie log prune queue")); + new InterruptedException("Thread interrupted during trie log processing.")); } final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes)); final Optional header = blockchain.getBlockHeader(blockHash); @@ -152,17 +154,19 @@ private void preloadQueue( } }); - timeoutFuture.cancel(true); LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue()); LOG.atInfo().log( "Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...", addToPruneQueueCount.intValue()); int prunedCount = pruneFromQueue(); - LOG.atInfo().log("Pruned {} trie logs.", prunedCount); + LOG.atInfo().log("Pruned {} trie logs", prunedCount); } catch (Exception e) { - if (e.getCause() != null && e.getCause() instanceof TimeoutException) { + if (e instanceof InterruptedException + || (e.getCause() != null && e.getCause() instanceof InterruptedException)) { + LOG.info("Operation interrupted, but will attempt to prune what's in the queue so far..."); int prunedCount = pruneFromQueue(); - LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount); + LOG.atInfo().log("...pruned {} trie logs", prunedCount); + Thread.currentThread().interrupt(); // Preserve interrupt status } else { LOG.error("Error loading trie logs from database, nothing pruned", e); } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/trielog/TrieLogPrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPrunerTest.java similarity index 87% rename from ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/trielog/TrieLogPrunerTest.java rename to ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPrunerTest.java index 621e73711ab..5f985b1f89c 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/trielog/TrieLogPrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/common/trielog/TrieLogPrunerTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.trielog; +package org.hyperledger.besu.ethereum.trie.diffbased.common.trielog; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -26,9 +26,6 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage; -import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogAddedEvent; -import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogLayer; -import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogPruner; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import java.util.Optional; @@ -43,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; public class TrieLogPrunerTest { @@ -82,6 +80,53 @@ public void initialize_preloads_queue_and_prunes_orphaned_blocks() { verify(worldState, times(1)).pruneTrieLog(header2.getBlockHash()); } + @Test + public void preloadQueueWithTimeout_handles_timeout_during_streamTrieLogKeys() { + // Given + final int timeoutInSeconds = 1; + final long timeoutInMillis = timeoutInSeconds * 1000; + final int loadingLimit = 2; + TrieLogPruner trieLogPruner = + new TrieLogPruner( + worldState, blockchain, executeAsync, 3, loadingLimit, false, new NoOpMetricsSystem()); + + // Simulate a long-running operation + when(worldState.streamTrieLogKeys(loadingLimit)) + .thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Stream.empty())); + + // When + long startTime = System.currentTimeMillis(); + trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds); + long elapsedTime = System.currentTimeMillis() - startTime; + + // Then + assertThat(elapsedTime).isLessThan(timeoutInMillis * 2); + } + + @Test + public void preloadQueueWithTimeout_handles_timeout_during_getBlockHeader() { + // Given + final int timeoutInSeconds = 1; + final long timeoutInMillis = timeoutInSeconds * 1000; + TrieLogPruner trieLogPruner = setupPrunerAndFinalizedBlock(3, 1); + + // Simulate a long-running operation + when(blockchain.getBlockHeader(any(Hash.class))) + // delay on first invocation, then return empty + .thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Optional.empty())) + .thenReturn(Optional.empty()); + + // When + long startTime = System.currentTimeMillis(); + trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds); + long elapsedTime = System.currentTimeMillis() - startTime; + + // Then + assertThat(elapsedTime).isLessThan(timeoutInMillis * 2); + verify(worldState, times(1)).pruneTrieLog(key(1)); + verify(worldState, times(1)).pruneTrieLog(key(2)); + } + @Test public void trieLogs_pruned_in_reverse_order_within_pruning_window() { // Given