Skip to content

Commit

Permalink
do not merge: prefetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
icorderi committed Jun 8, 2023
1 parent db8a157 commit c283979
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 3 deletions.
8 changes: 8 additions & 0 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type BlockEvaluator interface {
TestTransactionGroup(txgroup []transactions.SignedTxn) error
Round() basics.Round
PaySetSize() int
PrefetchTransactionGroup(txgroup []transactions.SignedTxnWithAD)
TransactionGroup(txads []transactions.SignedTxnWithAD) error
Transaction(txn transactions.SignedTxn, ad transactions.ApplyData) error
GenerateBlock() (*ledgercore.ValidatedBlock, error)
Expand Down Expand Up @@ -474,6 +475,13 @@ func (pool *TransactionPool) Remember(txgroup []transactions.SignedTxn) error {
return nil
}

// Prefetch attempts to load all things needed to remember the transaction.
// It does not actually remember the transaction.
func (pool *TransactionPool) Prefetch(txgroup []transactions.SignedTxn) {
txgroupad := transactions.WrapSignedTxnsWithAD(txgroup)
pool.pendingBlockEvaluator.PrefetchTransactionGroup(txgroupad)
}

// Lookup returns the error associated with a transaction that used
// to be in the pool. If no status information is available (e.g., because
// it was too long ago, or the transaction committed successfully), then
Expand Down
41 changes: 41 additions & 0 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type TxHandler struct {
genesisHash crypto.Digest
txVerificationPool execpool.BacklogPool
backlogQueue chan *txBacklogMsg
prefetcher chan *txBacklogMsg
postVerificationQueue chan *verify.VerificationResult
backlogWg sync.WaitGroup
net network.GossipNode
Expand Down Expand Up @@ -165,6 +166,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
ledger: opts.Ledger,
txVerificationPool: opts.ExecutionPool,
backlogQueue: make(chan *txBacklogMsg, txBacklogSize),
prefetcher: make(chan *txBacklogMsg, txBacklogSize),
postVerificationQueue: make(chan *verify.VerificationResult, txBacklogSize),
net: opts.Net,
streamVerifierChan: make(chan execpool.InputJob),
Expand Down Expand Up @@ -225,6 +227,9 @@ func (handler *TxHandler) Start() {
handler.backlogWg.Add(2)
go handler.backlogWorker()
go handler.backlogGaugeThread()
for i := 0; i < 32; i++ {
go handler.prefetcherWorker()
}
handler.streamVerifier.Start(handler.ctx)
if handler.erl != nil {
handler.erl.Start()
Expand Down Expand Up @@ -266,6 +271,28 @@ func (handler *TxHandler) backlogGaugeThread() {
}
}

// prefetcherWorker is the worker go routine that prefetches accounts, resources, etc from the DB.
func (handler *TxHandler) prefetcherWorker() {
for {
select {
case wi, ok := <-handler.prefetcher:
if !ok {
// this is never happening since handler.backlogQueue is never closed
return
}
// nothing to prefetch if the txn is already in
if handler.checkAlreadyCommitted(wi) {
continue
}
// prefetch
txGroup := wi.unverifiedTxGroup
handler.txPool.Prefetch(txGroup)
case <-handler.ctx.Done():
return
}
}
}

// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
Expand Down Expand Up @@ -640,6 +667,20 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}
}

// queue the addreses needed to evaluate the transaction for prefetching
select {
case handler.prefetcher <- &txBacklogMsg{
rawmsg: &rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
rawmsgDataHash: msgKey,
unverifiedTxGroupHash: canonicalKey,
capguard: capguard,
}:
default:
// prefetcher seems to be too busy, skip prefetching
}

// queue the transaction for evaluation into the block
select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &rawmsg,
Expand Down
21 changes: 19 additions & 2 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,10 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
au.accountsMu.RUnlock()
needUnlock = false
}

// at this point, we do not have the account on any of our caches
ledgerCacheMissAccountCount.Inc(nil)

// No updates of this account in the in-memory deltas; use on-disk DB.
// The check in roundOffset() made sure the round is exactly the one
// present in the on-disk DB. As an optimization, we avoid creating
Expand All @@ -1482,10 +1486,22 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add
if persistedData.Round == currentDbRound {
if persistedData.Ref != nil {
// if we read actual data return it
au.baseAccounts.writePending(persistedData)
if synchronized {
au.accountsMu.Lock()
}
au.baseAccounts.write(persistedData)
if synchronized {
au.accountsMu.Unlock()
}
return persistedData.AccountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil
}
au.baseAccounts.writeNotFoundPending(addr)
if synchronized {
au.accountsMu.Lock()
}
au.baseAccounts.writeNotFound(addr)
if synchronized {
au.accountsMu.Unlock()
}
// otherwise return empty
return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil
}
Expand Down Expand Up @@ -1989,3 +2005,4 @@ var ledgerGeneratecatchpointCount = metrics.NewCounter("ledger_generatecatchpoin
var ledgerGeneratecatchpointMicros = metrics.NewCounter("ledger_generatecatchpoint_micros", "µs spent")
var ledgerVacuumCount = metrics.NewCounter("ledger_vacuum_count", "calls")
var ledgerVacuumMicros = metrics.NewCounter("ledger_vacuum_micros", "µs spent")
var ledgerCacheMissAccountCount = metrics.NewCounter("ledger_cache_miss_account", "misses")
20 changes: 20 additions & 0 deletions ledger/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,26 @@ func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {
return nil
}

// PrefetchTransactionGroup loads all related data needed by the transactions in the group
func (eval *BlockEvaluator) PrefetchTransactionGroup(txgroup []transactions.SignedTxnWithAD) {
cow := eval.state.child(len(txgroup))
defer cow.recycle()

for i := range txgroup {
txn := txgroup[i].Txn
// fetch sender
cow.lookup(txn.Sender)

switch txn.Type {
case protocol.PaymentTx:
cow.lookup(txn.CloseRemainderTo)
// TODO: write the rest
default:
// unknown txn type, lets not error here
}
}
}

// Transaction tentatively adds a new transaction as part of this block evaluation.
// If the transaction cannot be added to the block without violating some constraints,
// an error is returned and the block evaluator state is unchanged.
Expand Down
8 changes: 7 additions & 1 deletion ledger/lruaccts.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ outer2:
for ; pendingEntriesCount > 0; pendingEntriesCount-- {
select {
case addr := <-m.pendingNotFound:
m.notFound[addr] = struct{}{}
m.writeNotFound(addr)
default:
break outer2
}
Expand Down Expand Up @@ -148,6 +148,12 @@ func (m *lruAccounts) write(acctData trackerdb.PersistedAccountData) {
}
}

// write a single account as not found on the db
// thread locking semantics : write lock
func (m *lruAccounts) writeNotFound(addr basics.Address) {
m.notFound[addr] = struct{}{}
}

// prune adjust the current size of the lruAccounts cache, by dropping the least
// recently used entries.
// thread locking semantics : write lock
Expand Down

0 comments on commit c283979

Please sign in to comment.