diff --git a/RELEASES.md b/RELEASES.md
index c4beb051e1..29febf1347 100644
--- a/RELEASES.md
+++ b/RELEASES.md
@@ -2,11 +2,15 @@
## Pending Release
-## Updates
+* Refactored trie_prefetcher.go to be structurally similar to upstream.
+
+## [v0.7.0](https://github.com/ava-labs/subnet-evm/releases/tag/v0.7.0)
+
+### Updates
- Changed default write option from `Sync` to `NoSync` in PebbleDB
-## Fixes
+### Fixes
- Fixed database close on shutdown
diff --git a/core/blockchain.go b/core/blockchain.go
index 7fc6c62688..19615c80ba 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1349,16 +1349,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockContentValidationTimer.Inc(time.Since(substart).Milliseconds())
// No validation errors for the block
- var activeState *state.StateDB
- defer func() {
- // The chain importer is starting and stopping trie prefetchers. If a bad
- // block or other error is hit however, an early return may not properly
- // terminate the background threads. This defer ensures that we clean up
- // and dangling prefetcher, without deferring each and holding on live refs.
- if activeState != nil {
- activeState.StopPrefetcher()
- }
- }()
// Retrieve the parent block to determine which root to build state on
substart = time.Now()
@@ -1377,8 +1367,8 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())
// Enable prefetching to pull in trie node paths while processing transactions
- statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
- activeState = statedb
+ statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism))
+ defer statedb.StopPrefetcher()
// Process block using the parent state as reference point
pstart := time.Now()
@@ -1736,10 +1726,8 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}
// Enable prefetching to pull in trie node paths while processing transactions
- statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
- defer func() {
- statedb.StopPrefetcher()
- }()
+ statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism))
+ defer statedb.StopPrefetcher()
// Process previously stored block
receipts, _, usedGas, err := bc.processor.Process(current, parent.Header(), statedb, vm.Config{})
diff --git a/core/state/statedb.go b/core/state/statedb.go
index e73cf9accb..b4c2da9566 100644
--- a/core/state/statedb.go
+++ b/core/state/statedb.go
@@ -41,6 +41,7 @@ import (
"github.com/ava-labs/subnet-evm/trie"
"github.com/ava-labs/subnet-evm/trie/trienode"
"github.com/ava-labs/subnet-evm/trie/triestate"
+ "github.com/ava-labs/subnet-evm/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@@ -200,16 +201,33 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
return sdb, nil
}
+type workerPool struct {
+ *utils.BoundedWorkers
+}
+
+func (wp *workerPool) Done() {
+ // Done is guaranteed to only be called after all work is already complete,
+ // so Wait()ing is redundant, but it also releases resources.
+ wp.BoundedWorkers.Wait()
+}
+
+func WithConcurrentWorkers(prefetchers int) PrefetcherOption {
+ pool := &workerPool{
+ BoundedWorkers: utils.NewBoundedWorkers(prefetchers),
+ }
+ return WithWorkerPools(func() WorkerPool { return pool })
+}
+
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
-func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
+func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
- s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency)
+ s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...)
}
}
diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go
index 6c6ddeab07..8465deae2a 100644
--- a/core/state/trie_prefetcher.go
+++ b/core/state/trie_prefetcher.go
@@ -1,13 +1,3 @@
-// (c) 2019-2020, Ava Labs, Inc.
-//
-// This file is a derived work, based on the go-ethereum library whose original
-// notices appear below.
-//
-// It is distributed under a license compatible with the licensing terms of the
-// original code from which it is derived.
-//
-// Much love to the original authors for their work.
-// **********
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
@@ -28,16 +18,17 @@ package state
import (
"sync"
- "time"
+ "github.com/ava-labs/subnet-evm/libevm/options"
"github.com/ava-labs/subnet-evm/metrics"
- "github.com/ava-labs/subnet-evm/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
-// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
-const triePrefetchMetricsPrefix = "trie/prefetch/"
+var (
+ // triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
+ triePrefetchMetricsPrefix = "trie/prefetch/"
+)
// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
@@ -50,91 +41,61 @@ type triePrefetcher struct {
fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies.
fetchers map[string]*subfetcher // Subfetchers for each trie
- maxConcurrency int
- workers *utils.BoundedWorkers
-
- subfetcherWorkersMeter metrics.Meter
- subfetcherWaitTimer metrics.Counter
- subfetcherCopiesMeter metrics.Meter
-
+ deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter
+ storageLoadMeter metrics.Meter
+ storageDupMeter metrics.Meter
+ storageSkipMeter metrics.Meter
+ storageWasteMeter metrics.Meter
- storageFetchersMeter metrics.Meter
- storageLoadMeter metrics.Meter
- storageLargestLoadMeter metrics.Meter
- storageDupMeter metrics.Meter
- storageSkipMeter metrics.Meter
- storageWasteMeter metrics.Meter
+ options []PrefetcherOption
}
-func newTriePrefetcher(db Database, root common.Hash, namespace string, maxConcurrency int) *triePrefetcher {
+func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
- return &triePrefetcher{
+ p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
- maxConcurrency: maxConcurrency,
- workers: utils.NewBoundedWorkers(maxConcurrency), // Scale up as needed to [maxConcurrency]
-
- subfetcherWorkersMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/workers", nil),
- subfetcherWaitTimer: metrics.GetOrRegisterCounter(prefix+"/subfetcher/wait", nil),
- subfetcherCopiesMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/copies", nil),
-
+ deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
+ storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
+ storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
+ storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
+ storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
- storageFetchersMeter: metrics.GetOrRegisterMeter(prefix+"/storage/fetchers", nil),
- storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
- storageLargestLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/lload", nil),
- storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
- storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
- storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
+ options: opts,
}
+ return p
}
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
- // If the prefetcher is an inactive one, bail out
- if p.fetches != nil {
- return
- }
-
- // Collect stats from all fetchers
- var (
- storageFetchers int64
- largestLoad int64
- )
for _, fetcher := range p.fetchers {
- fetcher.abort() // safe to call multiple times (should be a no-op on happy path)
+ fetcher.abort() // safe to do multiple times
if metrics.Enabled {
- p.subfetcherCopiesMeter.Mark(int64(fetcher.copies()))
-
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
- p.accountSkipMeter.Mark(int64(fetcher.skips()))
+ p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
- storageFetchers++
- oseen := int64(len(fetcher.seen))
- if oseen > largestLoad {
- largestLoad = oseen
- }
- p.storageLoadMeter.Mark(oseen)
+ p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
- p.storageSkipMeter.Mark(int64(fetcher.skips()))
+ p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
@@ -143,20 +104,7 @@ func (p *triePrefetcher) close() {
}
}
}
- if metrics.Enabled {
- p.storageFetchersMeter.Mark(storageFetchers)
- p.storageLargestLoadMeter.Mark(largestLoad)
- }
-
- // Stop all workers once fetchers are aborted (otherwise
- // could stop while waiting)
- //
- // Record number of workers that were spawned during this run
- workersUsed := int64(p.workers.Wait())
- if metrics.Enabled {
- p.subfetcherWorkersMeter.Mark(workersUsed)
- }
-
+ p.releaseWorkerPools()
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
@@ -169,23 +117,19 @@ func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
- fetches: make(map[string]Trie), // Active prefetchers use the fetchers map
-
- subfetcherWorkersMeter: p.subfetcherWorkersMeter,
- subfetcherWaitTimer: p.subfetcherWaitTimer,
- subfetcherCopiesMeter: p.subfetcherCopiesMeter,
+ fetches: make(map[string]Trie), // Active prefetchers use the fetches map
+ deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
+ storageLoadMeter: p.storageLoadMeter,
+ storageDupMeter: p.storageDupMeter,
+ storageSkipMeter: p.storageSkipMeter,
+ storageWasteMeter: p.storageWasteMeter,
- storageFetchersMeter: p.storageFetchersMeter,
- storageLoadMeter: p.storageLoadMeter,
- storageLargestLoadMeter: p.storageLargestLoadMeter,
- storageDupMeter: p.storageDupMeter,
- storageSkipMeter: p.storageSkipMeter,
- storageWasteMeter: p.storageWasteMeter,
+ options: p.options,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
@@ -210,12 +154,11 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
if p.fetches != nil {
return
}
-
// Active fetcher, schedule the retrievals
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
- fetcher = newSubfetcher(p, owner, root, addr)
+ fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
@@ -229,27 +172,24 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
if p.fetches != nil {
trie := p.fetches[id]
if trie == nil {
+ p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
-
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
+ p.deliveryMissMeter.Mark(1)
return nil
}
+ // Interrupt the prefetcher if it's by any chance still running and return
+ // a copy of any pre-loaded trie.
+ fetcher.abort() // safe to do multiple times
- // Wait for the fetcher to finish and shutdown orchestrator, if it exists
- start := time.Now()
- fetcher.wait()
- if metrics.Enabled {
- p.subfetcherWaitTimer.Inc(time.Since(start).Milliseconds())
- }
-
- // Return a copy of one of the prefetched tries
trie := fetcher.peek()
if trie == nil {
+ p.deliveryMissMeter.Mark(1)
return nil
}
return trie
@@ -276,365 +216,175 @@ func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string {
// main prefetcher is paused and either all requested items are processed or if
// the trie being worked on is retrieved from the prefetcher.
type subfetcher struct {
- p *triePrefetcher
-
db Database // Database to load trie nodes through
state common.Hash // Root hash of the state to prefetch
owner common.Hash // Owner of the trie, usually account hash
root common.Hash // Root hash of the trie to prefetch
addr common.Address // Address of the account that the trie belongs to
+ trie Trie // Trie being populated with nodes
+
+ tasks [][]byte // Items queued up for retrieval
+ lock sync.Mutex // Lock protecting the task queue
- to *trieOrchestrator // Orchestrate concurrent fetching of a single trie
+ wake chan struct{} // Wake channel if a new task is scheduled
+ stop chan struct{} // Channel to interrupt processing
+ term chan struct{} // Channel to signal interruption
+ copy chan chan Trie // Channel to request a copy of the current trie
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
+
+ pool *subfetcherPool
}
// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
-func newSubfetcher(p *triePrefetcher, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
+func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher {
sf := &subfetcher{
- p: p,
- db: p.db,
- state: p.root,
+ db: db,
+ state: state,
owner: owner,
root: root,
addr: addr,
+ wake: make(chan struct{}, 1),
+ stop: make(chan struct{}),
+ term: make(chan struct{}),
+ copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
- sf.to = newTrieOrchestrator(sf)
- if sf.to != nil {
- go sf.to.processTasks()
- }
- // We return [sf] here to ensure we don't try to re-create if
- // we aren't able to setup a [newTrieOrchestrator] the first time.
+ options.As[prefetcherConfig](opts...).applyTo(sf)
+ go sf.loop()
return sf
}
// schedule adds a batch of trie keys to the queue to prefetch.
-// This should never block, so an array is used instead of a channel.
-//
-// This is not thread-safe.
func (sf *subfetcher) schedule(keys [][]byte) {
// Append the tasks to the current queue
- tasks := make([][]byte, 0, len(keys))
- for _, key := range keys {
- // Check if keys already seen
- sk := string(key)
- if _, ok := sf.seen[sk]; ok {
- sf.dups++
- continue
- }
- sf.seen[sk] = struct{}{}
- tasks = append(tasks, key)
- }
+ sf.lock.Lock()
+ sf.tasks = append(sf.tasks, keys...)
+ sf.lock.Unlock()
- // After counting keys, exit if they can't be prefetched
- if sf.to == nil {
- return
+ // Notify the prefetcher, it's fine if it's already terminated
+ select {
+ case sf.wake <- struct{}{}:
+ default:
}
-
- // Add tasks to queue for prefetching
- sf.to.enqueueTasks(tasks)
}
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
// is currently.
func (sf *subfetcher) peek() Trie {
- if sf.to == nil {
- return nil
- }
- return sf.to.copyBase()
-}
+ ch := make(chan Trie)
+ select {
+ case sf.copy <- ch:
+ // Subfetcher still alive, return copy from it
+ return <-ch
-// wait must only be called if [triePrefetcher] has not been closed. If this happens,
-// workers will not finish.
-func (sf *subfetcher) wait() {
- if sf.to == nil {
- // Unable to open trie
- return
+ case <-sf.term:
+ // Subfetcher already terminated, return a copy directly
+ if sf.trie == nil {
+ return nil
+ }
+ return sf.db.CopyTrie(sf.trie)
}
- sf.to.wait()
}
+// abort interrupts the subfetcher immediately. It is safe to call abort multiple
+// times but it is not thread safe.
func (sf *subfetcher) abort() {
- if sf.to == nil {
- // Unable to open trie
- return
- }
- sf.to.abort()
-}
-
-func (sf *subfetcher) skips() int {
- if sf.to == nil {
- // Unable to open trie
- return 0
- }
- return sf.to.skipCount()
-}
-
-func (sf *subfetcher) copies() int {
- if sf.to == nil {
- // Unable to open trie
- return 0
+ select {
+ case <-sf.stop:
+ default:
+ close(sf.stop)
}
- return sf.to.copies
+ <-sf.term
}
-// trieOrchestrator is not thread-safe.
-type trieOrchestrator struct {
- sf *subfetcher
-
- // base is an unmodified Trie we keep for
- // creating copies for each worker goroutine.
- //
- // We care more about quick copies than good copies
- // because most (if not all) of the nodes that will be populated
- // in the copy will come from the underlying triedb cache. Ones
- // that don't come from this cache probably had to be fetched
- // from disk anyways.
- base Trie
- baseLock sync.Mutex
-
- tasksAllowed bool
- skips int // number of tasks skipped
- pendingTasks [][]byte
- taskLock sync.Mutex
-
- processingTasks sync.WaitGroup
-
- wake chan struct{}
- stop chan struct{}
- stopOnce sync.Once
- loopTerm chan struct{}
-
- copies int
- copyChan chan Trie
- copySpawner chan struct{}
-}
+// loop waits for new tasks to be scheduled and keeps loading them until it runs
+// out of tasks or its underlying trie is retrieved for committing.
+func (sf *subfetcher) loop() {
+ // No matter how the loop stops, signal anyone waiting that it's terminated
+ defer func() {
+ sf.pool.wait()
+ close(sf.term)
+ }()
-func newTrieOrchestrator(sf *subfetcher) *trieOrchestrator {
// Start by opening the trie and stop processing if it fails
- var (
- base Trie
- err error
- )
if sf.owner == (common.Hash{}) {
- base, err = sf.db.OpenTrie(sf.root)
+ trie, err := sf.db.OpenTrie(sf.root)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
- return nil
+ return
}
+ sf.trie = trie
} else {
// The trie argument can be nil as verkle doesn't support prefetching
// yet. TODO FIX IT(rjl493456442), otherwise code will panic here.
- base, err = sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
+ trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
- return nil
+ return
}
+ sf.trie = trie
}
+ // Trie opened successfully, keep prefetching items
+ for {
+ select {
+ case <-sf.wake:
+ // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
+ sf.lock.Lock()
+ tasks := sf.tasks
+ sf.tasks = nil
+ sf.lock.Unlock()
+
+ // Prefetch any tasks until the loop is interrupted
+ for _, task := range tasks {
+ select {
+ case ch := <-sf.copy:
+ // Somebody wants a copy of the current trie, grant them
+ ch <- sf.db.CopyTrie(sf.trie)
+
+ default:
+ // No termination request yet, prefetch the next entry
+ if _, ok := sf.seen[string(task)]; ok {
+ sf.dups++
+ } else {
+ if len(task) == common.AddressLength {
+ sf.pool.GetAccount(common.BytesToAddress(task))
+ } else {
+ sf.pool.GetStorage(sf.addr, task)
+ }
+ sf.seen[string(task)] = struct{}{}
+ }
+ }
+ }
- // Instantiate trieOrchestrator
- to := &trieOrchestrator{
- sf: sf,
- base: base,
-
- tasksAllowed: true,
- wake: make(chan struct{}, 1),
- stop: make(chan struct{}),
- loopTerm: make(chan struct{}),
-
- copyChan: make(chan Trie, sf.p.maxConcurrency),
- copySpawner: make(chan struct{}, sf.p.maxConcurrency),
- }
-
- // Create initial trie copy
- to.copies++
- to.copySpawner <- struct{}{}
- to.copyChan <- to.copyBase()
- return to
-}
-
-func (to *trieOrchestrator) copyBase() Trie {
- to.baseLock.Lock()
- defer to.baseLock.Unlock()
-
- return to.sf.db.CopyTrie(to.base)
-}
-
-func (to *trieOrchestrator) skipCount() int {
- to.taskLock.Lock()
- defer to.taskLock.Unlock()
-
- return to.skips
-}
-
-func (to *trieOrchestrator) enqueueTasks(tasks [][]byte) {
- to.taskLock.Lock()
- defer to.taskLock.Unlock()
-
- if len(tasks) == 0 {
- return
- }
-
- // Add tasks to [pendingTasks]
- if !to.tasksAllowed {
- to.skips += len(tasks)
- return
- }
- to.processingTasks.Add(len(tasks))
- to.pendingTasks = append(to.pendingTasks, tasks...)
-
- // Wake up processor
- select {
- case to.wake <- struct{}{}:
- default:
- }
-}
-
-func (to *trieOrchestrator) handleStop(remaining int) {
- to.taskLock.Lock()
- to.skips += remaining
- to.taskLock.Unlock()
- to.processingTasks.Add(-remaining)
-}
+ case ch := <-sf.copy:
+ // Somebody wants a copy of the current trie, grant them
+ ch <- sf.db.CopyTrie(sf.trie)
-func (to *trieOrchestrator) processTasks() {
- defer close(to.loopTerm)
+ case <-sf.stop:
+ //libevm:start
+ //
+ // This is copied, with alteration, from ethereum/go-ethereum#29519
+ // and can be deleted once we update to include that change.
- for {
- // Determine if we should process or exit
- select {
- case <-to.wake:
- case <-to.stop:
- return
- }
+ // Termination is requested, abort if no more tasks are pending. If
+ // there are some, exhaust them first.
+ sf.lock.Lock()
+ done := len(sf.tasks) == 0
+ sf.lock.Unlock()
- // Get current tasks
- to.taskLock.Lock()
- tasks := to.pendingTasks
- to.pendingTasks = nil
- to.taskLock.Unlock()
-
- // Enqueue more work as soon as trie copies are available
- lt := len(tasks)
- for i := 0; i < lt; i++ {
- // Try to stop as soon as possible, if channel is closed
- remaining := lt - i
- select {
- case <-to.stop:
- to.handleStop(remaining)
+ if done {
return
- default:
}
- // Try to create to get an active copy first (select is non-deterministic,
- // so we may end up creating a new copy when we don't need to)
- var t Trie
select {
- case t = <-to.copyChan:
+ case sf.wake <- struct{}{}:
default:
- // Wait for an available copy or create one, if we weren't
- // able to get a previously created copy
- select {
- case <-to.stop:
- to.handleStop(remaining)
- return
- case t = <-to.copyChan:
- case to.copySpawner <- struct{}{}:
- to.copies++
- t = to.copyBase()
- }
- }
-
- // Enqueue work, unless stopped.
- fTask := tasks[i]
- f := func() {
- // Perform task
- var err error
- if len(fTask) == common.AddressLength {
- _, err = t.GetAccount(common.BytesToAddress(fTask))
- } else {
- _, err = t.GetStorage(to.sf.addr, fTask)
- }
- if err != nil {
- log.Error("Trie prefetcher failed fetching", "root", to.sf.root, "err", err)
- }
- to.processingTasks.Done()
-
- // Return copy when we are done with it, so someone else can use it
- //
- // channel is buffered and will not block
- to.copyChan <- t
}
-
- // Enqueue task for processing (may spawn new goroutine
- // if not at [maxConcurrency])
- //
- // If workers are stopped before calling [Execute], this function may
- // panic.
- to.sf.p.workers.Execute(f)
+ //libevm:end
}
}
}
-
-func (to *trieOrchestrator) stopAcceptingTasks() {
- to.taskLock.Lock()
- defer to.taskLock.Unlock()
-
- if !to.tasksAllowed {
- return
- }
- to.tasksAllowed = false
-
- // We don't clear [to.pendingTasks] here because
- // it will be faster to prefetch them even though we
- // are still waiting.
-}
-
-// wait stops accepting new tasks and waits for ongoing tasks to complete. If
-// wait is called, it is not necessary to call [abort].
-//
-// It is safe to call wait multiple times.
-func (to *trieOrchestrator) wait() {
- // Prevent more tasks from being enqueued
- to.stopAcceptingTasks()
-
- // Wait for processing tasks to complete
- to.processingTasks.Wait()
-
- // Stop orchestrator loop
- to.stopOnce.Do(func() {
- close(to.stop)
- })
- <-to.loopTerm
-}
-
-// abort stops any ongoing tasks and shuts down the orchestrator loop. If abort
-// is called, it is not necessary to call [wait].
-//
-// It is safe to call abort multiple times.
-func (to *trieOrchestrator) abort() {
- // Prevent more tasks from being enqueued
- to.stopAcceptingTasks()
-
- // Stop orchestrator loop
- to.stopOnce.Do(func() {
- close(to.stop)
- })
- <-to.loopTerm
-
- // Capture any dangling pending tasks (processTasks
- // may exit before enqueing all pendingTasks)
- to.taskLock.Lock()
- pendingCount := len(to.pendingTasks)
- to.skips += pendingCount
- to.pendingTasks = nil
- to.taskLock.Unlock()
- to.processingTasks.Add(-pendingCount)
-
- // Wait for processing tasks to complete
- to.processingTasks.Wait()
-}
diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go
new file mode 100644
index 0000000000..d59bd4eb54
--- /dev/null
+++ b/core/state/trie_prefetcher.libevm.go
@@ -0,0 +1,126 @@
+// Copyright 2024 the libevm authors.
+//
+// The libevm additions to go-ethereum are free software: you can redistribute
+// them and/or modify them under the terms of the GNU Lesser General Public License
+// as published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The libevm additions are distributed in the hope that they will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+// .
+
+package state
+
+import (
+ "github.com/ava-labs/subnet-evm/libevm/options"
+ "github.com/ava-labs/subnet-evm/libevm/sync"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// A PrefetcherOption configures behaviour of trie prefetching.
+type PrefetcherOption = options.Option[prefetcherConfig]
+
+type prefetcherConfig struct {
+ newWorkers func() WorkerPool
+}
+
+// A WorkerPool executes functions asynchronously. Done() is called to signal
+// that the pool is no longer needed and that Execute() is guaranteed to not be
+// called again.
+type WorkerPool interface {
+ Execute(func())
+ Done()
+}
+
+// WithWorkerPools configures trie prefetching to execute asynchronously. The
+// provided constructor is called once for each trie being fetched but it MAY
+// return the same pool.
+func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
+ return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
+ c.newWorkers = ctor
+ })
+}
+
+type subfetcherPool struct {
+ workers WorkerPool
+ tries sync.Pool[Trie]
+ wg sync.WaitGroup
+}
+
+// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
+// with a [PrefetcherOption].
+func (c *prefetcherConfig) applyTo(sf *subfetcher) {
+ sf.pool = &subfetcherPool{
+ tries: sync.Pool[Trie]{
+ // Although the workers may be shared between all subfetchers, each
+ // MUST have its own Trie pool.
+ New: func() Trie {
+ return sf.db.CopyTrie(sf.trie)
+ },
+ },
+ }
+ if c.newWorkers != nil {
+ sf.pool.workers = c.newWorkers()
+ }
+}
+
+// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be
+// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed
+// to be shared between them. This is because we guarantee in the public API
+// that no further calls will be made to Execute() after a call to Done().
+func (p *triePrefetcher) releaseWorkerPools() {
+ for _, f := range p.fetchers {
+ if w := f.pool.workers; w != nil {
+ w.Done()
+ }
+ }
+}
+
+func (p *subfetcherPool) wait() {
+ p.wg.Wait()
+}
+
+// execute runs the provided function with a copy of the subfetcher's Trie.
+// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
+// configured with a [WorkerPool] then it is used for function execution,
+// otherwise `fn` is just called directly.
+func (p *subfetcherPool) execute(fn func(Trie)) {
+ p.wg.Add(1)
+ do := func() {
+ t := p.tries.Get()
+ fn(t)
+ p.tries.Put(t)
+ p.wg.Done()
+ }
+
+ if w := p.workers; w != nil {
+ w.Execute(do)
+ } else {
+ do()
+ }
+}
+
+// GetAccount optimistically pre-fetches an account, dropping the returned value
+// and logging errors. See [subfetcherPool.execute] re worker pools.
+func (p *subfetcherPool) GetAccount(addr common.Address) {
+ p.execute(func(t Trie) {
+ if _, err := t.GetAccount(addr); err != nil {
+ log.Error("account prefetching failed", "address", addr, "err", err)
+ }
+ })
+}
+
+// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
+func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
+ p.execute(func(t Trie) {
+ if _, err := t.GetStorage(addr, key); err != nil {
+ log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
+ }
+ })
+}
diff --git a/core/state/trie_prefetcher_extra_test.go b/core/state/trie_prefetcher_extra_test.go
new file mode 100644
index 0000000000..5ac1d1c5c2
--- /dev/null
+++ b/core/state/trie_prefetcher_extra_test.go
@@ -0,0 +1,189 @@
+// (c) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package state
+
+import (
+ "crypto/rand"
+ "encoding/binary"
+ "fmt"
+ "os"
+ "path"
+ "strconv"
+ "testing"
+
+ "github.com/ava-labs/avalanchego/database"
+ "github.com/ava-labs/subnet-evm/core/rawdb"
+ "github.com/ava-labs/subnet-evm/core/state/snapshot"
+ "github.com/ava-labs/subnet-evm/core/types"
+ "github.com/ava-labs/subnet-evm/metrics"
+ "github.com/ava-labs/subnet-evm/triedb"
+ "github.com/ava-labs/subnet-evm/triedb/hashdb"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/stretchr/testify/require"
+)
+
+const namespace = "chain"
+
+// BenchmarkPrefetcherDatabase benchmarks the performance of the trie
+// prefetcher. By default, a state with 100k storage keys is created and stored
+// in a temporary directory. Setting the TEST_DB_KVS and TEST_DB_DIR environment
+// variables modifies the defaults. The benchmark measures the time to update
+// the trie after 100, 200, and 500 storage slot updates per iteration,
+// simulating a block with that number of storage slot updates. For performance
+// reasons, when making changes involving the trie prefetcher, this benchmark
+// should be run against a state including around 100m storage entries.
+func BenchmarkPrefetcherDatabase(b *testing.B) {
+ require := require.New(b)
+
+ dir := b.TempDir()
+ if env := os.Getenv("TEST_DB_DIR"); env != "" {
+ dir = env
+ }
+ wantKVs := 100_000
+ if env := os.Getenv("TEST_DB_KVS"); env != "" {
+ var err error
+ wantKVs, err = strconv.Atoi(env)
+ require.NoError(err)
+ }
+
+ levelDB, err := rawdb.NewLevelDBDatabase(path.Join(dir, "level.db"), 0, 0, "", false)
+ require.NoError(err)
+
+ root := types.EmptyRootHash
+ count := uint64(0)
+ block := uint64(0)
+
+ rootKey := []byte("root")
+ countKey := []byte("count")
+ blockKey := []byte("block")
+ got, err := levelDB.Get(rootKey)
+ if err == nil { // on success
+ root = common.BytesToHash(got)
+ }
+ got, err = levelDB.Get(countKey)
+ if err == nil { // on success
+ count = binary.BigEndian.Uint64(got)
+ }
+ got, err = levelDB.Get(blockKey)
+ if err == nil { // on success
+ block = binary.BigEndian.Uint64(got)
+ }
+
+ // Make a trie on the levelDB
+ address1 := common.Address{42}
+ address2 := common.Address{43}
+ addBlock := func(db Database, snaps *snapshot.Tree, kvsPerBlock int, prefetchers int) {
+ _, root, err = addKVs(db, snaps, address1, address2, root, block, kvsPerBlock, prefetchers)
+ require.NoError(err)
+ count += uint64(kvsPerBlock)
+ block++
+ }
+
+ lastCommit := block
+ commit := func(levelDB ethdb.Database, snaps *snapshot.Tree, db Database) {
+ require.NoError(db.TrieDB().Commit(root, false))
+
+ for i := lastCommit + 1; i <= block; i++ {
+ require.NoError(snaps.Flatten(fakeHash(i)))
+ }
+ lastCommit = block
+
+ // update the tracking keys
+ require.NoError(levelDB.Put(rootKey, root.Bytes()))
+ require.NoError(database.PutUInt64(levelDB, blockKey, block))
+ require.NoError(database.PutUInt64(levelDB, countKey, count))
+ }
+
+ tdbConfig := &triedb.Config{
+ HashDB: &hashdb.Config{
+ CleanCacheSize: 3 * 1024 * 1024 * 1024,
+ },
+ }
+ db := NewDatabaseWithConfig(levelDB, tdbConfig)
+ snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root)
+ for count < uint64(wantKVs) {
+ previous := root
+ addBlock(db, snaps, 100_000, 0) // Note this updates root and count
+ b.Logf("Root: %v, kvs: %d, block: %d", root, count, block)
+
+ // Commit every 10 blocks or on the last iteration
+ if block%10 == 0 || count >= uint64(wantKVs) {
+ commit(levelDB, snaps, db)
+ b.Logf("Root: %v, kvs: %d, block: %d (committed)", root, count, block)
+ }
+ if previous != root {
+ require.NoError(db.TrieDB().Dereference(previous))
+ } else {
+ b.Fatal("root did not change")
+ }
+ }
+ require.NoError(levelDB.Close())
+ b.Logf("Starting benchmarks")
+ b.Logf("Root: %v, kvs: %d, block: %d", root, count, block)
+ for _, updates := range []int{100, 200, 500} {
+ for _, prefetchers := range []int{0, 1, 4, 16} {
+ b.Run(fmt.Sprintf("updates_%d_prefetchers_%d", updates, prefetchers), func(b *testing.B) {
+ startRoot, startBlock, startCount := root, block, count
+ defer func() { root, block, count = startRoot, startBlock, startCount }()
+
+ levelDB, err := rawdb.NewLevelDBDatabase(path.Join(dir, "level.db"), 0, 0, "", false)
+ require.NoError(err)
+ snaps := snapshot.NewTestTree(levelDB, fakeHash(block), root)
+ db := NewDatabaseWithConfig(levelDB, tdbConfig)
+ getMetric := func(metric string) int64 {
+ meter := metrics.GetOrRegisterMeter(triePrefetchMetricsPrefix+namespace+"/storage/"+metric, nil)
+ return meter.Snapshot().Count()
+ }
+ startLoads := getMetric("load")
+ for i := 0; i < b.N; i++ {
+ addBlock(db, snaps, updates, prefetchers)
+ }
+ require.NoError(levelDB.Close())
+ b.ReportMetric(float64(getMetric("load")-startLoads)/float64(b.N), "loads")
+ })
+ }
+ }
+}
+
+func fakeHash(block uint64) common.Hash {
+ return common.BytesToHash(binary.BigEndian.AppendUint64(nil, block))
+}
+
+// addKVs adds count random key-value pairs to the state trie of address1 and
+// address2 (each count/2) and returns the new state db and root.
+func addKVs(
+ db Database, snaps *snapshot.Tree,
+ address1, address2 common.Address, root common.Hash, block uint64,
+ count int, prefetchers int,
+) (*StateDB, common.Hash, error) {
+ snap := snaps.Snapshot(root)
+ if snap == nil {
+ return nil, common.Hash{}, fmt.Errorf("snapshot not found")
+ }
+ statedb, err := NewWithSnapshot(root, db, snap)
+ if err != nil {
+ return nil, common.Hash{}, fmt.Errorf("creating state with snapshot: %w", err)
+ }
+ if prefetchers > 0 {
+ statedb.StartPrefetcher(namespace, WithConcurrentWorkers(prefetchers))
+ defer statedb.StopPrefetcher()
+ }
+ for _, address := range []common.Address{address1, address2} {
+ statedb.SetNonce(address, 1)
+ for i := 0; i < count/2; i++ {
+ key := make([]byte, 32)
+ value := make([]byte, 32)
+ rand.Read(key)
+ rand.Read(value)
+
+ statedb.SetState(address, common.BytesToHash(key), common.BytesToHash(value))
+ }
+ }
+ root, err = statedb.CommitWithSnap(block+1, true, snaps, fakeHash(block+1), fakeHash(block))
+ if err != nil {
+ return nil, common.Hash{}, fmt.Errorf("committing with snap: %w", err)
+ }
+ return statedb, root, nil
+}
diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go
index b8edcbb6a8..999fcfc789 100644
--- a/core/state/trie_prefetcher_test.go
+++ b/core/state/trie_prefetcher_test.go
@@ -59,7 +59,7 @@ func filledStateDB() *StateDB {
func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
- prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency)
+ prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency))
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
@@ -84,7 +84,7 @@ func TestCopyAndClose(t *testing.T) {
func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
- prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency)
+ prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency))
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
a := prefetcher.trie(common.Hash{}, db.originalRoot)
@@ -100,7 +100,7 @@ func TestUseAfterClose(t *testing.T) {
func TestCopyClose(t *testing.T) {
db := filledStateDB()
- prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency)
+ prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", WithConcurrentWorkers(maxConcurrency))
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
cpy := prefetcher.copy()
diff --git a/libevm/options/options.go b/libevm/options/options.go
new file mode 100644
index 0000000000..af7bc751a9
--- /dev/null
+++ b/libevm/options/options.go
@@ -0,0 +1,42 @@
+// Copyright 2024 the libevm authors.
+//
+// The libevm additions to go-ethereum are free software: you can redistribute
+// them and/or modify them under the terms of the GNU Lesser General Public License
+// as published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The libevm additions are distributed in the hope that they will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+// .
+
+// Package options provides a generic mechanism for defining configuration of
+// arbitrary types.
+package options
+
+// An Option configures values of arbitrary type.
+type Option[T any] interface {
+ Configure(*T)
+}
+
+// As applies Options to a zero-value T, which it then returns.
+func As[T any](opts ...Option[T]) *T {
+ var t T
+ for _, o := range opts {
+ o.Configure(&t)
+ }
+ return &t
+}
+
+// A Func converts a function into an [Option], using itself as the Configure
+// method.
+type Func[T any] func(*T)
+
+var _ Option[struct{}] = Func[struct{}](nil)
+
+// Configure implements the [Option] interface.
+func (f Func[T]) Configure(t *T) { f(t) }
diff --git a/libevm/sync/sync.go b/libevm/sync/sync.go
new file mode 100644
index 0000000000..7621e20d79
--- /dev/null
+++ b/libevm/sync/sync.go
@@ -0,0 +1,52 @@
+// Copyright 2024 the subnet-evm authors.
+//
+// The libevm additions to go-ethereum are free software: you can redistribute
+// them and/or modify them under the terms of the GNU Lesser General Public License
+// as published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The libevm additions are distributed in the hope that they will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+// .
+
+// Package sync extends the standard library's sync package.
+package sync
+
+import "sync"
+
+// Aliases of stdlib sync's types to avoid having to import it alongside this
+// package.
+type (
+ Cond = sync.Cond
+ Locker = sync.Locker
+ Map = sync.Map
+ Mutex = sync.Mutex
+ Once = sync.Once
+ RWMutex = sync.RWMutex
+ WaitGroup = sync.WaitGroup
+)
+
+// A Pool is a type-safe wrapper around [sync.Pool].
+type Pool[T any] struct {
+ New func() T
+ pool sync.Pool
+ once Once
+}
+
+// Get is equivalent to [sync.Pool.Get].
+func (p *Pool[T]) Get() T {
+ p.once.Do(func() { // Do() guarantees at least once, not just only once
+ p.pool.New = func() any { return p.New() }
+ })
+ return p.pool.Get().(T) //nolint:forcetypeassert
+}
+
+// Put is equivalent to [sync.Pool.Put].
+func (p *Pool[T]) Put(t T) {
+ p.pool.Put(t)
+}
diff --git a/miner/worker.go b/miner/worker.go
index a7c2a43665..b7b5e7b9b8 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -280,14 +280,15 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte
}
func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.PredicateContext, parent *types.Header, header *types.Header, tstart time.Time) (*environment, error) {
- state, err := w.chain.StateAt(parent.Root)
+ currentState, err := w.chain.StateAt(parent.Root)
if err != nil {
return nil, err
}
- state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism)
+ numPrefetchers := w.chain.CacheConfig().TriePrefetcherParallelism
+ currentState.StartPrefetcher("miner", state.WithConcurrentWorkers(numPrefetchers))
return &environment{
signer: types.MakeSigner(w.chainConfig, header.Number, header.Time),
- state: state,
+ state: currentState,
parent: parent,
header: header,
tcount: 0,