Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx indexing fix #1131

Merged
merged 17 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type CacheConfig struct {
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call (= StateSyncEnabled)
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
SnapshotVerify bool // Verify generated snapshots
Preimages bool // Whether to store preimage of trie key to the disk
Expand Down Expand Up @@ -301,6 +301,9 @@ type BlockChain struct {

// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]

// [txIndexTailLock] is used to synchronize the updating of the tx index tail.
txIndexTailLock sync.Mutex
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -413,13 +416,32 @@ func NewBlockChain(
// Warm up [hc.acceptedNumberCache] and [acceptedLogsCache]
bc.warmAcceptedCaches()

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.setTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
go bc.startAcceptor()

// Start tx indexer/unindexer if required.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.wg.Add(1)
go bc.dispatchTxUnindexer()
var (
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
sub = bc.SubscribeChainAcceptedEvent(headCh)
)
go func() {
defer bc.wg.Done()
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
}
defer sub.Unsubscribe()

bc.maintainTxIndex(headCh)
}()
}
return bc, nil
}
Expand All @@ -428,9 +450,12 @@ func NewBlockChain(
func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}) {
start := time.Now()
txLookupLimit := bc.cacheConfig.TxLookupLimit
bc.txIndexTailLock.Lock()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
bc.txIndexTailLock.Unlock()
close(done)
bc.wg.Done()
}()

if head-txLookupLimit+1 >= tail {
Expand All @@ -439,12 +464,11 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
}
}

// dispatchTxUnindexer is responsible for the deletion of the
// transaction index.
// maintainTxIndex is responsible for the deletion of the
// transaction index. This does not support reconstruction of removed indexes.
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
// Meaning that this function should never be called.
func (bc *BlockChain) dispatchTxUnindexer() {
defer bc.wg.Done()
func (bc *BlockChain) maintainTxIndex(headCh <-chan ChainEvent) {
txLookupLimit := bc.cacheConfig.TxLookupLimit

// If the user just upgraded to a new version which supports transaction
Expand All @@ -455,15 +479,19 @@ func (bc *BlockChain) dispatchTxUnindexer() {

// Any reindexing done, start listening to chain events and moving the index window
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
)
sub := bc.SubscribeChainAcceptedEvent(headCh)
if sub == nil {
log.Warn("could not create chain accepted subscription to unindex txs")
return
log.Info("Initialized transaction unindexer", "limit", txLookupLimit)

// Launch the initial processing if chain is not empty. This step is
// useful in these scenarios that chain has no progress and indexer
// is never triggered.
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
done = make(chan struct{})
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
}
defer sub.Unsubscribe()

for {
select {
Expand All @@ -477,13 +505,14 @@ func (bc *BlockChain) dispatchTxUnindexer() {
done = make(chan struct{})
// Note: tail will not be nil since it is initialized in this function.
tail := rawdb.ReadTxIndexTail(bc.db)
bc.wg.Add(1)
go bc.unindexBlocks(*tail, headNum, done)
}
case <-done:
done = nil
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
log.Info("Waiting background transaction unindexer to exit")
<-done
}
return
Expand All @@ -497,15 +526,22 @@ func (bc *BlockChain) dispatchTxUnindexer() {
// - updating the acceptor tip index
func (bc *BlockChain) writeBlockAcceptedIndices(b *types.Block) error {
batch := bc.db.NewBatch()
if err := bc.batchBlockAcceptedIndices(batch, b); err != nil {
return err
}
if err := batch.Write(); err != nil {
return fmt.Errorf("%w: failed to write accepted indices entries batch", err)
}
return nil
}

func (bc *BlockChain) batchBlockAcceptedIndices(batch ethdb.Batch, b *types.Block) error {
if !bc.cacheConfig.SkipTxIndexing {
rawdb.WriteTxLookupEntriesByBlock(batch, b)
}
if err := rawdb.WriteAcceptorTip(batch, b.Hash()); err != nil {
return fmt.Errorf("%w: failed to write acceptor tip key", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("%w: failed to write tx lookup entries batch", err)
}
return nil
}

Expand Down Expand Up @@ -2105,6 +2141,8 @@ func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct
return blockRoots
}

// TODO: split extras to blockchain_extra.go

// ResetToStateSyncedBlock reinitializes the state of the blockchain
// to the trie represented by [block.Root()] after updating
// in-memory and on disk current block pointers to [block].
Expand All @@ -2115,7 +2153,9 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {

// Update head block and snapshot pointers on disk
batch := bc.db.NewBatch()
rawdb.WriteAcceptorTip(batch, block.Hash())
if err := bc.batchBlockAcceptedIndices(batch, block); err != nil {
return err
}
rawdb.WriteHeadBlockHash(batch, block.Hash())
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteSnapshotBlockHash(batch, block.Hash())
Expand All @@ -2128,6 +2168,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
return err
}

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TxLookupLimit != 0 {
bc.setTxIndexTail(block.NumberU64())
}

// Update all in-memory chain markers
bc.lastAccepted = block
bc.acceptorTip = block
Expand Down Expand Up @@ -2160,3 +2205,20 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}

func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
bc.txIndexTailLock.Lock()
defer bc.txIndexTailLock.Unlock()

tailP := rawdb.ReadTxIndexTail(bc.db)
var tailV uint64
if tailP != nil {
tailV = *tailP
}

if newTail > tailV {
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
rawdb.WriteTxIndexTail(bc.db, newTail)
}
return nil
}
Loading
Loading