Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx indexing fix #1131

Merged
merged 17 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 64 additions & 10 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type CacheConfig struct {
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call (= StateSyncEnabled)
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
SnapshotVerify bool // Verify generated snapshots
Preimages bool // Whether to store preimage of trie key to the disk
Expand Down Expand Up @@ -301,6 +301,9 @@ type BlockChain struct {

// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]

// [txIndexTailLock] is used to synchronize the updating of the tx index tail.
txIndexTailLock sync.Mutex
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -413,13 +416,19 @@ func NewBlockChain(
// Warm up [hc.acceptedNumberCache] and [acceptedLogsCache]
bc.warmAcceptedCaches()

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.setTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
go bc.startAcceptor()

// Start tx indexer/unindexer if required.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.dispatchTxUnindexer()
go bc.maintainTxIndex()
}
return bc, nil
}
Expand All @@ -428,8 +437,10 @@ func NewBlockChain(
func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}) {
start := time.Now()
txLookupLimit := bc.cacheConfig.TxLookupLimit
bc.txIndexTailLock.Lock()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
bc.txIndexTailLock.Unlock()
close(done)
}()

Expand All @@ -439,11 +450,11 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
}
}

// dispatchTxUnindexer is responsible for the deletion of the
// transaction index.
// maintainTxIndex is responsible for the deletion of the
// transaction index. This does not support reconstruction of removed indexes.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) dispatchTxUnindexer() {
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()
txLookupLimit := bc.cacheConfig.TxLookupLimit

Expand All @@ -464,6 +475,16 @@ func (bc *BlockChain) dispatchTxUnindexer() {
return
}
defer sub.Unsubscribe()
log.Info("Initialized transaction unindexer", "limit", txLookupLimit)

// Launch the initial processing if chain is not empty. This step is
// useful in these scenarios that chain has no progress and indexer
// is never triggered.
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
done = make(chan struct{})
tail := rawdb.ReadTxIndexTail(bc.db)
go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
}

for {
select {
Expand All @@ -483,7 +504,7 @@ func (bc *BlockChain) dispatchTxUnindexer() {
done = nil
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
log.Info("Waiting background transaction unindexer to exit")
<-done
}
return
Expand All @@ -497,15 +518,22 @@ func (bc *BlockChain) dispatchTxUnindexer() {
// - updating the acceptor tip index
func (bc *BlockChain) writeBlockAcceptedIndices(b *types.Block) error {
batch := bc.db.NewBatch()
if err := bc.batchBlockAcceptedIndices(batch, b); err != nil {
return err
}
if err := batch.Write(); err != nil {
return fmt.Errorf("%w: failed to write accepted indices entries batch", err)
}
return nil
}

func (bc *BlockChain) batchBlockAcceptedIndices(batch ethdb.Batch, b *types.Block) error {
if !bc.cacheConfig.SkipTxIndexing {
rawdb.WriteTxLookupEntriesByBlock(batch, b)
}
if err := rawdb.WriteAcceptorTip(batch, b.Hash()); err != nil {
return fmt.Errorf("%w: failed to write acceptor tip key", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("%w: failed to write tx lookup entries batch", err)
}
return nil
}

Expand Down Expand Up @@ -2105,6 +2133,8 @@ func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct
return blockRoots
}

// TODO: split extras to blockchain_extra.go

// ResetToStateSyncedBlock reinitializes the state of the blockchain
// to the trie represented by [block.Root()] after updating
// in-memory and on disk current block pointers to [block].
Expand All @@ -2115,7 +2145,9 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {

// Update head block and snapshot pointers on disk
batch := bc.db.NewBatch()
rawdb.WriteAcceptorTip(batch, block.Hash())
if err := bc.batchBlockAcceptedIndices(batch, block); err != nil {
return err
}
rawdb.WriteHeadBlockHash(batch, block.Hash())
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteSnapshotBlockHash(batch, block.Hash())
Expand All @@ -2128,6 +2160,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
return err
}

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.setTxIndexTail(block.NumberU64())
}

// Update all in-memory chain markers
bc.lastAccepted = block
bc.acceptorTip = block
Expand Down Expand Up @@ -2160,3 +2197,20 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}

func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
bc.txIndexTailLock.Lock()
defer bc.txIndexTailLock.Unlock()

tailP := rawdb.ReadTxIndexTail(bc.db)
var tailV uint64
if tailP != nil {
tailV = *tailP
}

if newTail > tailV {
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
rawdb.WriteTxIndexTail(bc.db, newTail)
}
return nil
}
146 changes: 39 additions & 107 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
}
}

// TODO: simplify the unindexer logic and this test.
func TestTransactionIndices(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
Expand Down Expand Up @@ -568,40 +567,6 @@ func TestTransactionIndices(t *testing.T) {
})
require.NoError(err)

check := func(tail *uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
var tailValue uint64
if tail == nil {
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}

for i := tailValue; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
Expand All @@ -628,10 +593,11 @@ func TestTransactionIndices(t *testing.T) {
}
chain.DrainAcceptorQueue()

chain.Stop()
check(nil, chain) // check all indices has been indexed
lastAcceptedBlock := blocks[len(blocks)-1]
require.Equal(lastAcceptedBlock.Hash(), chain.CurrentHeader().Hash())

lastAcceptedHash := chain.CurrentHeader().Hash()
CheckTxIndices(t, nil, lastAcceptedBlock.NumberU64(), chain.db, false) // check all indices has been indexed
chain.Stop()

// Reconstruct a block chain which only reserves limited tx indices
// 128 blocks were previously indexed. Now we add a new block at each test step.
Expand All @@ -646,40 +612,47 @@ func TestTransactionIndices(t *testing.T) {
t.Run(fmt.Sprintf("test-%d, limit: %d", i+1, l), func(t *testing.T) {
conf.TxLookupLimit = l

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedBlock.Hash())
require.NoError(err)

tail := getTail(l, lastAcceptedBlock.NumberU64())
// check if startup indices are correct
CheckTxIndices(t, tail, lastAcceptedBlock.NumberU64(), chain.db, false)

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
lastAcceptedBlock = newBlks[0]
err = chain.Accept(lastAcceptedBlock) // Accept the block to trigger indices updater.
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation

tail = getTail(l, lastAcceptedBlock.NumberU64())
// check if indices are updated correctly
CheckTxIndices(t, tail, lastAcceptedBlock.NumberU64(), chain.db, false)
chain.Stop()
var tail *uint64
if l == 0 {
tail = nil
} else {
var tl uint64
if chain.CurrentBlock().Number.Uint64() > l {
// tail should be the first block number which is indexed
// i.e the first block number that's in the lookup range
tl = chain.CurrentBlock().Number.Uint64() - l + 1
}
tail = &tl
}

check(tail, chain)

lastAcceptedHash = chain.CurrentHeader().Hash()
})
}
}

func getTail(limit uint64, lastAccepted uint64) *uint64 {
var tail *uint64
if limit == 0 {
tail = nil
} else {
var tl uint64
if lastAccepted > limit {
// tail should be the oldest block number which is indexed
// i.e the first block number that's in the lookup range
tl = lastAccepted - limit + 1
}
tail = &tl
}

return tail
ceyonur marked this conversation as resolved.
Show resolved Hide resolved
}

func TestTransactionSkipIndexing(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
Expand Down Expand Up @@ -709,51 +682,6 @@ func TestTransactionSkipIndexing(t *testing.T) {
})
require.NoError(err)

checkRemoved := func(tail *uint64, to uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
var tailValue uint64
if tail == nil {
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}

for i := tailValue; i < to; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := to; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be skipped, number %d hash %s", i, tx.Hash().Hex())
}
}
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
Expand All @@ -771,22 +699,25 @@ func TestTransactionSkipIndexing(t *testing.T) {
chainDB := rawdb.NewMemoryDatabase()
chain, err := createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, 0, chain) // check all indices has been skipped
currentBlockNumber := chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail := chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, 0, chain)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, &tail, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
conf.SkipTxIndexing = false
chainDB = rawdb.NewMemoryDatabase()
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, chain.CurrentBlock().Number.Uint64()+1, chain) // check all indices has been indexed
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, 0, currentBlockNumber, currentBlockNumber, chainDB, false) // check all indices has been indexed

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
Expand All @@ -795,7 +726,8 @@ func TestTransactionSkipIndexing(t *testing.T) {
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail = chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, chain.CurrentBlock().Number.Uint64(), chain)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, &tail, tail, currentBlockNumber-1, currentBlockNumber, chainDB, false)
}

// TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted
Expand Down
Loading
Loading