Skip to content

Commit

Permalink
fix unindexor in state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyonur committed Apr 15, 2024
1 parent 2ac56a4 commit e88eab5
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 79 deletions.
57 changes: 46 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type CacheConfig struct {
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call (= StateSyncEnabled)
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
SnapshotVerify bool // Verify generated snapshots
Preimages bool // Whether to store preimage of trie key to the disk
Expand Down Expand Up @@ -335,6 +335,9 @@ type BlockChain struct {

// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]

// [txIndexTailLock] is used to synchronize the updating of the tx index tail.
txIndexTailLock sync.Mutex
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -444,13 +447,19 @@ func NewBlockChain(
// Warm up [hc.acceptedNumberCache] and [acceptedLogsCache]
bc.warmAcceptedCaches()

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.setTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
go bc.startAcceptor()

// Start tx indexer/unindexer if required.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.dispatchTxUnindexer()
go bc.maintainTxIndex()
}
return bc, nil
}
Expand All @@ -459,8 +468,10 @@ func NewBlockChain(
func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}) {
start := time.Now()
txLookupLimit := bc.cacheConfig.TxLookupLimit
bc.txIndexTailLock.Lock()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
bc.txIndexTailLock.Unlock()
close(done)
}()

Expand All @@ -476,11 +487,11 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
}
}

// dispatchTxUnindexer is responsible for the deletion of the
// transaction index.
// maintainTxIndex is responsible for the deletion of the
// transaction index. This does not support reconstruction of removed indexes.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) dispatchTxUnindexer() {
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()
txLookupLimit := bc.cacheConfig.TxLookupLimit

Expand All @@ -506,11 +517,11 @@ func (bc *BlockChain) dispatchTxUnindexer() {
// Launch the initial processing if chain is not empty. This step is
// useful in these scenarios that chain has no progress and indexer
// is never triggered.
// if head := bc.lastAccepted; head != nil && head.NumberU64() > txLookupLimit {
// done = make(chan struct{})
// tail := rawdb.ReadTxIndexTail(bc.db)
// go bc.unindexBlocks(*tail, head.NumberU64(), done)
// }
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
done = make(chan struct{})
tail := rawdb.ReadTxIndexTail(bc.db)
go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
}

for {
select {
Expand All @@ -530,7 +541,7 @@ func (bc *BlockChain) dispatchTxUnindexer() {
done = nil
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
log.Info("Waiting background transaction unindexer to exit")
<-done
}
return
Expand Down Expand Up @@ -2170,6 +2181,8 @@ func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct
return blockRoots
}

// TODO: split extras to blockchain_extra.go

// ResetToStateSyncedBlock reinitializes the state of the blockchain
// to the trie represented by [block.Root()] after updating
// in-memory and on disk current block pointers to [block].
Expand All @@ -2195,6 +2208,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
return err
}

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.setTxIndexTail(block.NumberU64())
}

// Update all in-memory chain markers
bc.lastAccepted = block
bc.acceptorTip = block
Expand Down Expand Up @@ -2227,3 +2245,20 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}

func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
bc.txIndexTailLock.Lock()
defer bc.txIndexTailLock.Unlock()

tailP := rawdb.ReadTxIndexTail(bc.db)
var tailV uint64
if tailP != nil {
tailV = *tailP
}

if newTail > tailV {
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
rawdb.WriteTxIndexTail(bc.db, newTail)
}
return nil
}
90 changes: 31 additions & 59 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
}
}

// TODO: simplify the unindexer logic and this test.
func TestTransactionIndices(t *testing.T) {
// Configure and generate a sample block chain
var (
Expand Down Expand Up @@ -567,41 +566,6 @@ func TestTransactionIndices(t *testing.T) {
})
require.NoError(t, err)

check := func(t *testing.T, tail *uint64, chain *BlockChain) {
require := require.New(t)
stored := rawdb.ReadTxIndexTail(chain.db)
var tailValue uint64
if tail == nil {
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}

for i := tailValue; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
Expand All @@ -628,10 +592,11 @@ func TestTransactionIndices(t *testing.T) {
}
chain.DrainAcceptorQueue()

chain.Stop()
check(t, nil, chain) // check all indices has been indexed
lastAcceptedBlock := blocks[len(blocks)-1]
require.Equal(t, lastAcceptedBlock.Hash(), chain.CurrentHeader().Hash())

lastAcceptedHash := chain.CurrentHeader().Hash()
CheckTxIndices(t, nil, lastAcceptedBlock.NumberU64(), chain.db, false) // check all indices has been indexed
chain.Stop()

// Reconstruct a block chain which only reserves limited tx indices
// 128 blocks were previously indexed. Now we add a new block at each test step.
Expand All @@ -646,40 +611,47 @@ func TestTransactionIndices(t *testing.T) {
t.Run(fmt.Sprintf("test-%d, limit: %d", i+1, l), func(t *testing.T) {
conf.TxLookupLimit = l

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedBlock.Hash())
require.NoError(t, err)

tail := getTail(l, lastAcceptedBlock.NumberU64())
// check if startup indices are correct
CheckTxIndices(t, tail, lastAcceptedBlock.NumberU64(), chain.db, false)

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
require.NoError(t, err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
lastAcceptedBlock = newBlks[0]
err = chain.Accept(lastAcceptedBlock) // Accept the block to trigger indices updater.
require.NoError(t, err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation

tail = getTail(l, lastAcceptedBlock.NumberU64())
// check if indices are updated correctly
CheckTxIndices(t, tail, lastAcceptedBlock.NumberU64(), chain.db, false)
chain.Stop()
var tail *uint64
if l == 0 {
tail = nil
} else {
var tl uint64
if chain.CurrentBlock().Number.Uint64() > l {
// tail should be the first block number which is indexed
// i.e the first block number that's in the lookup range
tl = chain.CurrentBlock().Number.Uint64() - l + 1
}
tail = &tl
}

check(t, tail, chain)

lastAcceptedHash = chain.CurrentHeader().Hash()
})
}
}

func getTail(limit uint64, lastAccepted uint64) *uint64 {
var tail *uint64
if limit == 0 {
tail = nil
} else {
var tl uint64
if lastAccepted > limit {
// tail should be the oldest block number which is indexed
// i.e the first block number that's in the lookup range
tl = lastAccepted - limit + 1
}
tail = &tl
}

return tail
}

func TestTransactionSkipIndexing(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
Expand Down
17 changes: 17 additions & 0 deletions core/rawdb/accessors_state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/binary"

"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -176,3 +177,19 @@ func NewSyncPerformedIterator(db ethdb.Iteratee) ethdb.Iterator {
func UnpackSyncPerformedKey(key []byte) uint64 {
return binary.BigEndian.Uint64(key[len(syncPerformedPrefix):])
}

// GetLatestSyncPerformed returns the latest block number state synced performed to.
func GetLatestSyncPerformed(db ethdb.Iteratee) uint64 {
// XXX: assuming this is not sorted
it := rawdb.NewSyncPerformedIterator(db)
defer it.Release()

var latestSyncPerformed uint64
for it.Next() {
syncPerformed := rawdb.UnpackSyncPerformedKey(it.Key())
if syncPerformed > latestSyncPerformed {
latestSyncPerformed = syncPerformed
}
}
return latestSyncPerformed
}
38 changes: 38 additions & 0 deletions core/test_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/big"
"strings"
"testing"
"time"

"github.com/ava-labs/subnet-evm/commontype"
"github.com/ava-labs/subnet-evm/consensus/dummy"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type ChainTest struct {
Expand Down Expand Up @@ -1647,3 +1649,39 @@ func TestStatefulPrecompiles(t *testing.T, create func(db ethdb.Database, gspec
// This tests that the precompiles work as expected when they are enabled
checkBlockChainState(t, blockchain, gspec, chainDB, create, checkState)
}

func CheckTxIndices(t *testing.T, expectedTail *uint64, head uint64, db ethdb.Database, allowNilBlocks bool) {
require := require.New(t)
var tailValue uint64
if expectedTail == nil {
require.Nil(rawdb.ReadTxIndexTail(db))
tailValue = 0
} else {
tailValue = *expectedTail

require.Eventually(
func() bool {
stored := *rawdb.ReadTxIndexTail(db)
return tailValue == stored
},
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually", tailValue)
}

for i := uint64(0); i <= head; i++ {
block := rawdb.ReadBlock(db, rawdb.ReadCanonicalHash(db, i), i)
if block == nil && allowNilBlocks {
continue
}
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(db, tx.Hash())
if i < tailValue {
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
} else {
require.NotNilf(index, "Missing transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}
}
}
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Config struct {
PopulateMissingTries *uint64 // Height at which to start re-populating missing tries on startup.
PopulateMissingTriesParallelism int // Number of concurrent readers to use when re-populating missing tries on startup.
AllowMissingTries bool // Whether to allow an archival node to run with pruning enabled and corrupt a complete index.
SnapshotDelayInit bool // Whether snapshot tree should be initialized on startup or delayed until explicit call
SnapshotDelayInit bool // Whether snapshot tree should be initialized on startup or delayed until explicit call (= StateSyncEnabled)
SnapshotWait bool // Whether to wait for the initial snapshot generation
SnapshotVerify bool // Whether to verify generated snapshots
SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests)
Expand Down
Loading

0 comments on commit e88eab5

Please sign in to comment.