Skip to content

Commit

Permalink
Atomic trie height map repair (#413)
Browse files Browse the repository at this point in the history
* initial height map repair

* add vdb commit

* Atomic trie repair script

* clearer vdb commit

* update comment

* improve UT

* update comment

* review comments

* return int from repairAtomicTrie

* pr comments

* start the migration on VM initialize

* add comment

* fix bug

* fix bug, improve UT

* reduce logs

* pr comments

* test names

* remove nil check

* batch the sleeping

* move block parsing to init

* clarify the UT

* add err check

* pr comment

* comment

* pr comment

* remove delay as argument

* change log time
  • Loading branch information
darioush authored Dec 20, 2023
1 parent 1286d50 commit 58d524f
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 1 deletion.
8 changes: 8 additions & 0 deletions plugin/evm/atomic_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
_ AtomicTrie = &atomicTrie{}
lastCommittedKey = []byte("atomicTrieLastCommittedBlock")
appliedSharedMemoryCursorKey = []byte("atomicTrieLastAppliedToSharedMemory")
heightMapRepairKey = []byte("atomicTrieHeightMapRepair")
)

// AtomicTrie maintains an index of atomic operations by blockchainIDs for every block
Expand Down Expand Up @@ -82,6 +83,10 @@ type AtomicTrie interface {

// RejectTrie dereferences root from the trieDB, freeing memory.
RejectTrie(root common.Hash) error

// RepairHeightMap repairs the height map of the atomic trie by iterating
// over all leaves in the trie and committing the trie at every commit interval.
RepairHeightMap(to uint64) (bool, error)
}

// AtomicTrieIterator is a stateful iterator that iterates the leafs of an AtomicTrie
Expand All @@ -94,6 +99,9 @@ type AtomicTrieIterator interface {
// returned []byte can be freely modified
Key() []byte

// Value returns the current database value that the iterator is iterating
Value() []byte

// BlockNumber returns the current block number
BlockNumber() uint64

Expand Down
133 changes: 133 additions & 0 deletions plugin/evm/atomic_trie_height_map_repair.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// (c) 2020-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"errors"
"fmt"
"math"
"time"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/trie/trienode"
"github.com/ethereum/go-ethereum/log"
)

const (
repairDone = math.MaxUint64 // used as a marker for when the height map is repaired

iterationsPerDelay = 1000 // after this many iterations, pause for [iterationDelay]
iterationDelay = 100 * time.Millisecond // delay between iterations of the repair loop
)

func (a *atomicTrie) RepairHeightMap(to uint64) (bool, error) {
repairFrom, err := database.GetUInt64(a.metadataDB, heightMapRepairKey)
switch {
case errors.Is(err, database.ErrNotFound):
repairFrom = 0 // height map not repaired yet, start at 0
case err != nil:
return false, err
case repairFrom == repairDone:
// height map already repaired, nothing to do
return false, nil
}
return true, a.repairHeightMap(repairFrom, to)
}

func (a *atomicTrie) repairHeightMap(from, to uint64) error {
// open the atomic trie at the last known root with correct height map
// correspondance
fromRoot, err := getRoot(a.metadataDB, from)
if err != nil {
return fmt.Errorf("could not get root at height %d: %w", from, err)
}
hasher, err := a.OpenTrie(fromRoot)
if err != nil {
return fmt.Errorf("could not open atomic trie at root %s: %w", fromRoot, err)
}

// hashes values inserted in [hasher], and stores the result in the height
// map at [commitHeight]. Additionally, it updates the resume marker and
// re-opens [hasher] to respect the trie's no use after commit invariant.
var (
lastLog = time.Now()
logEach = 90 * time.Second
)
commitRepairedHeight := func(commitHeight uint64) error {
root, nodes := hasher.Commit(false)
if nodes != nil {
err := a.trieDB.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes))
if err != nil {
return err
}
err = a.trieDB.Commit(root, false)
if err != nil {
return err
}
}
err = a.metadataDB.Put(database.PackUInt64(commitHeight), root[:])
if err != nil {
return err
}
err = database.PutUInt64(a.metadataDB, heightMapRepairKey, commitHeight)
if err != nil {
return err
}
if time.Since(lastLog) > logEach {
log.Info("repairing atomic trie height map", "height", commitHeight, "root", root)
lastLog = time.Now()
}
hasher, err = a.OpenTrie(root)
return err
}

// iterate over all leaves in the current atomic trie
root, _ := a.LastCommitted()
it, err := a.Iterator(root, database.PackUInt64(from+1))
if err != nil {
return fmt.Errorf("could not create iterator for atomic trie at root %s: %w", root, err)
}

var height uint64
lastCommit := from
numIterations := 0
for it.Next() {
height = it.BlockNumber()
if height > to {
break
}

for next := lastCommit + a.commitInterval; next < height; next += a.commitInterval {
if err := commitRepairedHeight(next); err != nil {
return err
}
lastCommit = next
}

if err := hasher.Update(it.Key(), it.Value()); err != nil {
return fmt.Errorf("could not update atomic trie at root %s: %w", root, err)
}

numIterations++
if numIterations%iterationsPerDelay == 0 {
time.Sleep(iterationDelay) // pause to avoid putting a spike of load on the disk
}
}
if err := it.Error(); err != nil {
return fmt.Errorf("error iterating atomic trie: %w", err)
}
for next := lastCommit + a.commitInterval; next <= to; next += a.commitInterval {
if err := commitRepairedHeight(next); err != nil {
return err
}
}

// mark height map as repaired
if err := database.PutUInt64(a.metadataDB, heightMapRepairKey, repairDone); err != nil {
return err
}
log.Info("atomic trie height map repair complete", "height", height, "root", root)
return nil
}
116 changes: 116 additions & 0 deletions plugin/evm/atomic_trie_height_map_repair_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// (c) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"testing"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)

func TestAtomicTrieRepairHeightMap(t *testing.T) {
for name, test := range map[string]testAtomicTrieRepairHeightMap{
"last accepted after commit interval": {
lastAccepted: 3*testCommitInterval + 5,
skipAtomicTxs: func(height uint64) bool { return false },
},
"last accepted exactly a commit interval": {
lastAccepted: 3 * testCommitInterval,
skipAtomicTxs: func(height uint64) bool { return false },
},
"no atomic txs in a commit interval": {
lastAccepted: 3 * testCommitInterval,
skipAtomicTxs: func(height uint64) bool { return height > testCommitInterval && height <= 2*testCommitInterval },
},
"no atomic txs in the most recent commit intervals": {
lastAccepted: 3 * testCommitInterval,
skipAtomicTxs: func(height uint64) bool { return height > testCommitInterval+1 },
},
} {
t.Run(name, func(t *testing.T) { test.run(t) })
}
}

type testAtomicTrieRepairHeightMap struct {
lastAccepted uint64
skipAtomicTxs func(height uint64) bool
}

func (test testAtomicTrieRepairHeightMap) run(t *testing.T) {
require := require.New(t)

db := versiondb.New(memdb.New())
repo, err := NewAtomicTxRepository(db, testTxCodec(), 0, nil, nil, nil)
require.NoError(err)
atomicBackend, err := NewAtomicBackend(db, testSharedMemory(), nil, repo, 0, common.Hash{}, testCommitInterval)
require.NoError(err)
atomicTrie := atomicBackend.AtomicTrie().(*atomicTrie)

heightMap := make(map[uint64]common.Hash)
for height := uint64(1); height <= test.lastAccepted; height++ {
atomicRequests := testDataImportTx().mustAtomicOps()
if test.skipAtomicTxs(height) {
atomicRequests = nil
}
err := indexAtomicTxs(atomicTrie, height, atomicRequests)
require.NoError(err)
if height%testCommitInterval == 0 {
root, _ := atomicTrie.LastCommitted()
heightMap[height] = root
}
}

// Verify that [atomicTrie] can access each of the expected roots
verifyRoots := func(expectZero bool) {
for height, hash := range heightMap {
root, err := atomicTrie.Root(height)
require.NoError(err)
if expectZero {
require.Zero(root)
} else {
require.Equal(hash, root)
}
}
}
verifyRoots(false)

// destroy the height map
for height := range heightMap {
err := atomicTrie.metadataDB.Delete(database.PackUInt64(height))
require.NoError(err)
}
require.NoError(db.Commit())
verifyRoots(true)

// repair the height map
repaired, err := atomicTrie.RepairHeightMap(test.lastAccepted)
require.NoError(err)
verifyRoots(false)
require.True(repaired)

// partially destroy the height map
_, lastHeight := atomicTrie.LastCommitted()
err = atomicTrie.metadataDB.Delete(database.PackUInt64(lastHeight))
require.NoError(err)
err = atomicTrie.metadataDB.Put(
heightMapRepairKey,
database.PackUInt64(lastHeight-testCommitInterval),
)
require.NoError(err)

// repair the height map
repaired, err = atomicTrie.RepairHeightMap(test.lastAccepted)
require.NoError(err)
verifyRoots(false)
require.True(repaired)

// try to repair the height map again
repaired, err = atomicTrie.RepairHeightMap(test.lastAccepted)
require.NoError(err)
require.False(repaired)
}
5 changes: 5 additions & 0 deletions plugin/evm/atomic_trie_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ func (a *atomicTrieIterator) AtomicOps() *atomic.Requests {
func (a *atomicTrieIterator) Key() []byte {
return a.key
}

// Value returns the current database value that the iterator is iterating
func (a *atomicTrieIterator) Value() []byte {
return a.trieIterator.Value
}
2 changes: 1 addition & 1 deletion plugin/evm/atomic_trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestIndexerWriteAndRead(t *testing.T) {
assert.Len(t, blockRootMap, 3)

hash, height := atomicTrie.LastCommitted()
assert.EqualValues(t, lastCommittedBlockHeight, height, "expected %d was %d", 200, lastCommittedBlockHeight)
assert.EqualValues(t, lastCommittedBlockHeight, height)
assert.Equal(t, lastCommittedBlockHash, hash)

// Verify that [atomicTrie] can access each of the expected roots
Expand Down
8 changes: 8 additions & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,14 @@ func (vm *VM) Initialize(
}
vm.atomicTrie = vm.atomicBackend.AtomicTrie()

// Run the atomic trie height map repair in the background on mainnet/fuji
// TODO: remove after DUpgrade
if vm.chainID.Cmp(params.AvalancheMainnetChainID) == 0 ||
vm.chainID.Cmp(params.AvalancheFujiChainID) == 0 {
_, lastCommitted := vm.atomicTrie.LastCommitted()
go vm.atomicTrie.RepairHeightMap(lastCommitted)
}

go vm.ctx.Log.RecoverAndPanic(vm.startContinuousProfiler)

// The Codec explicitly registers the types it requires from the secp256k1fx
Expand Down

0 comments on commit 58d524f

Please sign in to comment.