From c283979891346edb502f06886a1336f698d01f1f Mon Sep 17 00:00:00 2001 From: Ignacio Corderi Date: Thu, 8 Jun 2023 13:30:32 -0300 Subject: [PATCH] do not merge: prefetcher --- data/pools/transactionPool.go | 8 +++++++ data/txHandler.go | 41 +++++++++++++++++++++++++++++++++++ ledger/acctupdates.go | 21 ++++++++++++++++-- ledger/eval/eval.go | 20 +++++++++++++++++ ledger/lruaccts.go | 8 ++++++- 5 files changed, 95 insertions(+), 3 deletions(-) diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index a03baea4f0..3a855dbd60 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -103,6 +103,7 @@ type BlockEvaluator interface { TestTransactionGroup(txgroup []transactions.SignedTxn) error Round() basics.Round PaySetSize() int + PrefetchTransactionGroup(txgroup []transactions.SignedTxnWithAD) TransactionGroup(txads []transactions.SignedTxnWithAD) error Transaction(txn transactions.SignedTxn, ad transactions.ApplyData) error GenerateBlock() (*ledgercore.ValidatedBlock, error) @@ -474,6 +475,13 @@ func (pool *TransactionPool) Remember(txgroup []transactions.SignedTxn) error { return nil } +// Prefetch attempts to load all things needed to remember the transaction. +// It does not actually remember the transaction. +func (pool *TransactionPool) Prefetch(txgroup []transactions.SignedTxn) { + txgroupad := transactions.WrapSignedTxnsWithAD(txgroup) + pool.pendingBlockEvaluator.PrefetchTransactionGroup(txgroupad) +} + // Lookup returns the error associated with a transaction that used // to be in the pool. If no status information is available (e.g., because // it was too long ago, or the transaction committed successfully), then diff --git a/data/txHandler.go b/data/txHandler.go index 74f9b07b77..57038ff0ed 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -117,6 +117,7 @@ type TxHandler struct { genesisHash crypto.Digest txVerificationPool execpool.BacklogPool backlogQueue chan *txBacklogMsg + prefetcher chan *txBacklogMsg postVerificationQueue chan *verify.VerificationResult backlogWg sync.WaitGroup net network.GossipNode @@ -165,6 +166,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { ledger: opts.Ledger, txVerificationPool: opts.ExecutionPool, backlogQueue: make(chan *txBacklogMsg, txBacklogSize), + prefetcher: make(chan *txBacklogMsg, txBacklogSize), postVerificationQueue: make(chan *verify.VerificationResult, txBacklogSize), net: opts.Net, streamVerifierChan: make(chan execpool.InputJob), @@ -225,6 +227,9 @@ func (handler *TxHandler) Start() { handler.backlogWg.Add(2) go handler.backlogWorker() go handler.backlogGaugeThread() + for i := 0; i < 32; i++ { + go handler.prefetcherWorker() + } handler.streamVerifier.Start(handler.ctx) if handler.erl != nil { handler.erl.Start() @@ -266,6 +271,28 @@ func (handler *TxHandler) backlogGaugeThread() { } } +// prefetcherWorker is the worker go routine that prefetches accounts, resources, etc from the DB. +func (handler *TxHandler) prefetcherWorker() { + for { + select { + case wi, ok := <-handler.prefetcher: + if !ok { + // this is never happening since handler.backlogQueue is never closed + return + } + // nothing to prefetch if the txn is already in + if handler.checkAlreadyCommitted(wi) { + continue + } + // prefetch + txGroup := wi.unverifiedTxGroup + handler.txPool.Prefetch(txGroup) + case <-handler.ctx.Done(): + return + } + } +} + // backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels // and dispatches them further. func (handler *TxHandler) backlogWorker() { @@ -640,6 +667,20 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net } } + // queue the addreses needed to evaluate the transaction for prefetching + select { + case handler.prefetcher <- &txBacklogMsg{ + rawmsg: &rawmsg, + unverifiedTxGroup: unverifiedTxGroup, + rawmsgDataHash: msgKey, + unverifiedTxGroupHash: canonicalKey, + capguard: capguard, + }: + default: + // prefetcher seems to be too busy, skip prefetching + } + + // queue the transaction for evaluation into the block select { case handler.backlogQueue <- &txBacklogMsg{ rawmsg: &rawmsg, diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index d2850a51a8..4b1cf522c4 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -1470,6 +1470,10 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add au.accountsMu.RUnlock() needUnlock = false } + + // at this point, we do not have the account on any of our caches + ledgerCacheMissAccountCount.Inc(nil) + // No updates of this account in the in-memory deltas; use on-disk DB. // The check in roundOffset() made sure the round is exactly the one // present in the on-disk DB. As an optimization, we avoid creating @@ -1482,10 +1486,22 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add if persistedData.Round == currentDbRound { if persistedData.Ref != nil { // if we read actual data return it - au.baseAccounts.writePending(persistedData) + if synchronized { + au.accountsMu.Lock() + } + au.baseAccounts.write(persistedData) + if synchronized { + au.accountsMu.Unlock() + } return persistedData.AccountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil } - au.baseAccounts.writeNotFoundPending(addr) + if synchronized { + au.accountsMu.Lock() + } + au.baseAccounts.writeNotFound(addr) + if synchronized { + au.accountsMu.Unlock() + } // otherwise return empty return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil } @@ -1989,3 +2005,4 @@ var ledgerGeneratecatchpointCount = metrics.NewCounter("ledger_generatecatchpoin var ledgerGeneratecatchpointMicros = metrics.NewCounter("ledger_generatecatchpoint_micros", "µs spent") var ledgerVacuumCount = metrics.NewCounter("ledger_vacuum_count", "calls") var ledgerVacuumMicros = metrics.NewCounter("ledger_vacuum_micros", "µs spent") +var ledgerCacheMissAccountCount = metrics.NewCounter("ledger_cache_miss_account", "misses") diff --git a/ledger/eval/eval.go b/ledger/eval/eval.go index d27c1e9b47..0940ecddd7 100644 --- a/ledger/eval/eval.go +++ b/ledger/eval/eval.go @@ -923,6 +923,26 @@ func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error { return nil } +// PrefetchTransactionGroup loads all related data needed by the transactions in the group +func (eval *BlockEvaluator) PrefetchTransactionGroup(txgroup []transactions.SignedTxnWithAD) { + cow := eval.state.child(len(txgroup)) + defer cow.recycle() + + for i := range txgroup { + txn := txgroup[i].Txn + // fetch sender + cow.lookup(txn.Sender) + + switch txn.Type { + case protocol.PaymentTx: + cow.lookup(txn.CloseRemainderTo) + // TODO: write the rest + default: + // unknown txn type, lets not error here + } + } +} + // Transaction tentatively adds a new transaction as part of this block evaluation. // If the transaction cannot be added to the block without violating some constraints, // an error is returned and the block evaluator state is unchanged. diff --git a/ledger/lruaccts.go b/ledger/lruaccts.go index 9351604de2..57105bec36 100644 --- a/ledger/lruaccts.go +++ b/ledger/lruaccts.go @@ -99,7 +99,7 @@ outer2: for ; pendingEntriesCount > 0; pendingEntriesCount-- { select { case addr := <-m.pendingNotFound: - m.notFound[addr] = struct{}{} + m.writeNotFound(addr) default: break outer2 } @@ -148,6 +148,12 @@ func (m *lruAccounts) write(acctData trackerdb.PersistedAccountData) { } } +// write a single account as not found on the db +// thread locking semantics : write lock +func (m *lruAccounts) writeNotFound(addr basics.Address) { + m.notFound[addr] = struct{}{} +} + // prune adjust the current size of the lruAccounts cache, by dropping the least // recently used entries. // thread locking semantics : write lock