Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx indexer fix: avoids using sleep in test #1151

Merged
merged 8 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,20 @@ func NewBlockChain(
// Start tx indexer/unindexer if required.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.maintainTxIndex()
var (
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
sub = bc.SubscribeChainAcceptedEvent(headCh)
)
go func() {
defer bc.wg.Done()
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()

bc.maintainTxIndex(headCh)
}()
}
return bc, nil
}
Expand All @@ -442,6 +455,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
txUnindexTimer.Inc(time.Since(start).Milliseconds())
bc.txIndexTailLock.Unlock()
close(done)
bc.wg.Done()
}()

if head-txLookupLimit+1 >= tail {
Expand All @@ -454,8 +468,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
// transaction index. This does not support reconstruction of removed indexes.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()
func (bc *BlockChain) maintainTxIndex(headCh <-chan ChainEvent) {
txLookupLimit := bc.cacheConfig.TxLookupLimit

// If the user just upgraded to a new version which supports transaction
Expand All @@ -466,15 +479,8 @@ func (bc *BlockChain) maintainTxIndex() {

// Any reindexing done, start listening to chain events and moving the index window
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
)
sub := bc.SubscribeChainAcceptedEvent(headCh)
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()
log.Info("Initialized transaction unindexer", "limit", txLookupLimit)

// Launch the initial processing if chain is not empty. This step is
Expand All @@ -483,6 +489,7 @@ func (bc *BlockChain) maintainTxIndex() {
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
done = make(chan struct{})
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively I think we can check if done != nil { ... } like its done in bc.quit, but also wg seems fine.

go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
}

Expand All @@ -498,6 +505,7 @@ func (bc *BlockChain) maintainTxIndex() {
done = make(chan struct{})
// Note: tail will not be nil since it is initialized in this function.
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
go bc.unindexBlocks(*tail, headNum, done)
}
case <-done:
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
"os"
"testing"
"time"

"github.com/ava-labs/subnet-evm/consensus/dummy"
"github.com/ava-labs/subnet-evm/core/rawdb"
Expand Down Expand Up @@ -697,6 +696,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
require.NoError(err)
currentBlockNumber := chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
Expand All @@ -705,6 +705,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail := currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
Expand All @@ -714,6 +715,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
require.NoError(err)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, 0, currentBlockNumber, currentBlockNumber, chainDB, false) // check all indices has been indexed
chain.Stop()

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
Expand All @@ -724,6 +726,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail = currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, tail, currentBlockNumber-1, currentBlockNumber, chainDB, false)
chain.Stop()
}

// TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted
Expand Down Expand Up @@ -1196,10 +1199,7 @@ func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Ge
return nil, err
}
}

chain.DrainAcceptorQueue()
time.Sleep(500 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
return chain, nil
}
5 changes: 3 additions & 2 deletions core/test_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,13 +1669,14 @@ func checkTxIndicesHelper(t *testing.T, expectedTail *uint64, indexedFrom uint64
if expectedTail == nil {
require.Nil(rawdb.ReadTxIndexTail(db))
} else {
var stored uint64
tailValue := *expectedTail
require.Eventually(
func() bool {
stored := *rawdb.ReadTxIndexTail(db)
stored = *rawdb.ReadTxIndexTail(db)
return tailValue == stored
},
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually", tailValue)
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually (was %d)", tailValue, stored)
}

for i := uint64(0); i <= head; i++ {
Expand Down
Loading