Skip to content

Commit

Permalink
tx indexer fix: avoids using sleep in test (#1151)
Browse files Browse the repository at this point in the history
* avoids using sleep in test

* track all goroutines

* trying harder

* add some debug information in case of fail

* Update core/blockchain.go

Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org>

* Update core/blockchain.go

Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org>

---------

Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org>
  • Loading branch information
darioush authored Apr 17, 2024
1 parent ac4e239 commit 26e6a34
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
30 changes: 19 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,20 @@ func NewBlockChain(
// Start tx indexer/unindexer if required.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.maintainTxIndex()
var (
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
sub = bc.SubscribeChainAcceptedEvent(headCh)
)
go func() {
defer bc.wg.Done()
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()

bc.maintainTxIndex(headCh)
}()
}
return bc, nil
}
Expand All @@ -442,6 +455,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
txUnindexTimer.Inc(time.Since(start).Milliseconds())
bc.txIndexTailLock.Unlock()
close(done)
bc.wg.Done()
}()

if head-txLookupLimit+1 >= tail {
Expand All @@ -454,8 +468,7 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
// transaction index. This does not support reconstruction of removed indexes.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()
func (bc *BlockChain) maintainTxIndex(headCh <-chan ChainEvent) {
txLookupLimit := bc.cacheConfig.TxLookupLimit

// If the user just upgraded to a new version which supports transaction
Expand All @@ -466,15 +479,8 @@ func (bc *BlockChain) maintainTxIndex() {

// Any reindexing done, start listening to chain events and moving the index window
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
)
sub := bc.SubscribeChainAcceptedEvent(headCh)
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()
log.Info("Initialized transaction unindexer", "limit", txLookupLimit)

// Launch the initial processing if chain is not empty. This step is
Expand All @@ -483,6 +489,7 @@ func (bc *BlockChain) maintainTxIndex() {
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
done = make(chan struct{})
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
}

Expand All @@ -498,6 +505,7 @@ func (bc *BlockChain) maintainTxIndex() {
done = make(chan struct{})
// Note: tail will not be nil since it is initialized in this function.
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
go bc.unindexBlocks(*tail, headNum, done)
}
case <-done:
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
"os"
"testing"
"time"

"github.com/ava-labs/subnet-evm/consensus/dummy"
"github.com/ava-labs/subnet-evm/core/rawdb"
Expand Down Expand Up @@ -697,6 +696,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
require.NoError(err)
currentBlockNumber := chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
Expand All @@ -705,6 +705,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail := currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
Expand All @@ -714,6 +715,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
require.NoError(err)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, 0, currentBlockNumber, currentBlockNumber, chainDB, false) // check all indices has been indexed
chain.Stop()

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
Expand All @@ -724,6 +726,7 @@ func TestTransactionSkipIndexing(t *testing.T) {
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail = currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, tail, currentBlockNumber-1, currentBlockNumber, chainDB, false)
chain.Stop()
}

// TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted
Expand Down Expand Up @@ -1196,10 +1199,7 @@ func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Ge
return nil, err
}
}

chain.DrainAcceptorQueue()
time.Sleep(500 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
return chain, nil
}
5 changes: 3 additions & 2 deletions core/test_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,13 +1669,14 @@ func checkTxIndicesHelper(t *testing.T, expectedTail *uint64, indexedFrom uint64
if expectedTail == nil {
require.Nil(rawdb.ReadTxIndexTail(db))
} else {
var stored uint64
tailValue := *expectedTail
require.Eventually(
func() bool {
stored := *rawdb.ReadTxIndexTail(db)
stored = *rawdb.ReadTxIndexTail(db)
return tailValue == stored
},
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually", tailValue)
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually (was %d)", tailValue, stored)
}

for i := uint64(0); i <= head; i++ {
Expand Down

0 comments on commit 26e6a34

Please sign in to comment.