diff --git a/core/blockchain.go b/core/blockchain.go index 9ead498739..77212d4131 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -428,7 +428,20 @@ func NewBlockChain( // Start tx indexer/unindexer if required. if bc.cacheConfig.TxLookupLimit != 0 { bc.wg.Add(1) - go bc.maintainTxIndex() + var ( + headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed + sub = bc.SubscribeChainAcceptedEvent(headCh) + ) + go func() { + defer bc.wg.Done() + if sub == nil { + log.Warn("could not create chain accepted subscription to unindex txs") + return + } + defer sub.Unsubscribe() + + bc.maintainTxIndex(headCh) + }() } return bc, nil } @@ -442,6 +455,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{} txUnindexTimer.Inc(time.Since(start).Milliseconds()) bc.txIndexTailLock.Unlock() close(done) + bc.wg.Done() }() if head-txLookupLimit+1 >= tail { @@ -454,8 +468,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{} // transaction index. This does not support reconstruction of removed indexes. // Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved. // Meaning that this function should never be called. -func (bc *BlockChain) maintainTxIndex() { - defer bc.wg.Done() +func (bc *BlockChain) maintainTxIndex(headCh <-chan ChainEvent) { txLookupLimit := bc.cacheConfig.TxLookupLimit // If the user just upgraded to a new version which supports transaction @@ -466,15 +479,8 @@ func (bc *BlockChain) maintainTxIndex() { // Any reindexing done, start listening to chain events and moving the index window var ( - done chan struct{} // Non-nil if background unindexing or reindexing routine is active. - headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed + done chan struct{} // Non-nil if background unindexing or reindexing routine is active. ) - sub := bc.SubscribeChainAcceptedEvent(headCh) - if sub == nil { - log.Warn("could not create chain accepted subscription to unindex txs") - return - } - defer sub.Unsubscribe() log.Info("Initialized transaction unindexer", "limit", txLookupLimit) // Launch the initial processing if chain is not empty. This step is @@ -483,6 +489,7 @@ func (bc *BlockChain) maintainTxIndex() { if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit { done = make(chan struct{}) tail := rawdb.ReadTxIndexTail(bc.db) + bc.wg.Add(1) go bc.unindexBlocks(*tail, head.Number.Uint64(), done) } @@ -498,6 +505,7 @@ func (bc *BlockChain) maintainTxIndex() { done = make(chan struct{}) // Note: tail will not be nil since it is initialized in this function. tail := rawdb.ReadTxIndexTail(bc.db) + bc.wg.Add(1) go bc.unindexBlocks(*tail, headNum, done) } case <-done: diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 8d1fa20c2a..1e755c1f74 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -8,7 +8,6 @@ import ( "math/big" "os" "testing" - "time" "github.com/ava-labs/subnet-evm/consensus/dummy" "github.com/ava-labs/subnet-evm/core/rawdb" @@ -697,6 +696,7 @@ func TestTransactionSkipIndexing(t *testing.T) { require.NoError(err) currentBlockNumber := chain.CurrentBlock().Number.Uint64() checkTxIndicesHelper(t, nil, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped + chain.Stop() // test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated. conf.TxLookupLimit = 2 @@ -705,6 +705,7 @@ func TestTransactionSkipIndexing(t *testing.T) { currentBlockNumber = chain.CurrentBlock().Number.Uint64() tail := currentBlockNumber - conf.TxLookupLimit + 1 checkTxIndicesHelper(t, &tail, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped + chain.Stop() // test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated. conf.TxLookupLimit = 0 @@ -714,6 +715,7 @@ func TestTransactionSkipIndexing(t *testing.T) { require.NoError(err) currentBlockNumber = chain.CurrentBlock().Number.Uint64() checkTxIndicesHelper(t, nil, 0, currentBlockNumber, currentBlockNumber, chainDB, false) // check all indices has been indexed + chain.Stop() // now change tx index skipping to true and check that the indices are skipped for the last block // and old indices are removed up to the tail, but [tail, current) indices are still there. @@ -724,6 +726,7 @@ func TestTransactionSkipIndexing(t *testing.T) { currentBlockNumber = chain.CurrentBlock().Number.Uint64() tail = currentBlockNumber - conf.TxLookupLimit + 1 checkTxIndicesHelper(t, &tail, tail, currentBlockNumber-1, currentBlockNumber, chainDB, false) + chain.Stop() } // TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted @@ -1196,10 +1199,7 @@ func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Ge return nil, err } } - chain.DrainAcceptorQueue() - time.Sleep(500 * time.Millisecond) // Wait for indices initialisation - chain.Stop() return chain, nil } diff --git a/core/test_blockchain.go b/core/test_blockchain.go index cfed861d08..f7d907a0cf 100644 --- a/core/test_blockchain.go +++ b/core/test_blockchain.go @@ -1669,13 +1669,14 @@ func checkTxIndicesHelper(t *testing.T, expectedTail *uint64, indexedFrom uint64 if expectedTail == nil { require.Nil(rawdb.ReadTxIndexTail(db)) } else { + var stored uint64 tailValue := *expectedTail require.Eventually( func() bool { - stored := *rawdb.ReadTxIndexTail(db) + stored = *rawdb.ReadTxIndexTail(db) return tailValue == stored }, - 10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually", tailValue) + 10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually (was %d)", tailValue, stored) } for i := uint64(0); i <= head; i++ {