Skip to content

Commit

Permalink
Perf base v2 (#7)
Browse files Browse the repository at this point in the history
* opt truncatePending and truncateQueue

* downgrade the level of log in eth_sendRawTransaction

* disable pending cache of legacypool when --mine enabled

* split metrics of txpool by cases of 'reorg' and 'reset'

* add logs to trace the reset event of txpool

* add logs to trace the reset event of txpool

---------

Co-authored-by: andyzhang2023 <andyzhang2023@gmail.com>
  • Loading branch information
andyzhang2023 and andyzhang2023 authored Dec 12, 2024
1 parent 3c286cc commit e871e53
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 47 deletions.
4 changes: 4 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
cfg.Eth.OverrideVerkle = &v
}

if ctx.Bool(utils.MiningEnabledFlag.Name) {
cfg.Eth.TxPool.EnableCache = true
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down
5 changes: 3 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
log.Info("txpool-trace send ChainHeadEvent", "hash", lastCanon.Hash(), "number", lastCanon.NumberU64())
}
}()
// Start the parallel header verifier
Expand All @@ -1723,7 +1724,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
} else {
block, err = it.next()
}
log.Info("perf-trace InsertChain it.next", "duration", common.PrettyDuration(time.Since(traceStart)), "hash", chain[0].Header().Hash(), "number", chain[0].NumberU64())
log.Info("perf-trace txpool-trace InsertChain it.next", "duration", common.PrettyDuration(time.Since(traceStart)), "hash", chain[0].Header().Hash(), "number", chain[0].NumberU64())

// Left-trim all the known blocks that don't need to build snapshot
if bc.skipBlock(err, it) {
Expand Down Expand Up @@ -2547,7 +2548,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
if timestamp := time.Unix(int64(head.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
log.Info("perf-trace Chain head was updated", context...)
log.Info("perf-trace txpool-trace Chain head was updated", context...)
return head.Hash(), nil
}

Expand Down
8 changes: 8 additions & 0 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ var (
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

type pendingCache interface {
add(types.Transactions, types.Signer)
del(types.Transactions, types.Signer)
dump() map[common.Address]types.Transactions
markLocal(common.Address)
flattenLocals() []common.Address
}

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
Expand Down
126 changes: 87 additions & 39 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,29 @@ var (
// reorg detail metrics
resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil)
reorgWaitLockTimer = metrics.NewRegisteredTimer("txpool/reorg/waittime", nil)
resetWaitLockTimer = metrics.NewRegisteredTimer("txpool/reset/waittime", nil)
promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil)
promoteResetTimer = metrics.NewRegisteredTimer("txpool/promotetime/reset", nil)
demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil)
reheapInDemoteTimer = metrics.NewRegisteredTimer("txpool/reheap/in/demotetime", nil)
reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil)
truncatePendingTimer = metrics.NewRegisteredTimer("txpool/truncate/queue/time", nil)
truncatePendingInnerTimer = metrics.NewRegisteredTimer("txpool/truncate/queue/time/inner", nil)
truncateQueueTimer = metrics.NewRegisteredTimer("txpool/truncate/pending/time", nil)
truncateQueueInnerTimer = metrics.NewRegisteredTimer("txpool/truncate/pending/time/inner", nil)
truncatePendingTimer = metrics.NewRegisteredTimer("txpool/truncate/pending/time", nil)
truncateResetPendingTimer = metrics.NewRegisteredTimer("txpool/truncate/reset/pending/time", nil)
truncatePendingInnerTimer = metrics.NewRegisteredTimer("txpool/truncate/pending/time/inner", nil)
truncateQueueTimer = metrics.NewRegisteredTimer("txpool/truncate/queue/time", nil)
truncateResetQueueTimer = metrics.NewRegisteredTimer("txpool/truncate/reset/queue/time", nil)
truncateQueueInnerTimer = metrics.NewRegisteredTimer("txpool/truncate/queue/time/inner", nil)
reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil)

// latency of accessing state objects
accountSnapReadsTimer = metrics.NewRegisteredTimer("txpool/account/snap/readtime", nil)
accountTrieReadsTimer = metrics.NewRegisteredTimer("txpool/account/trie/readtime", nil)

feedTimer = metrics.NewRegisteredTimer("txpool/feed/time", nil)
sendFeedTxCount = metrics.NewRegisteredCounter("txpool/sendfeed/tx", nil)
demoteTxCount = metrics.NewRegisteredCounter("txpool/demote/tx/count", nil)
promoteTxCount = metrics.NewRegisteredCounter("txpool/promote/tx/count", nil)
reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil)
reorgResetCount = metrics.NewRegisteredCounter("txpool/reorg/reset/count", nil)
feedTimer = metrics.NewRegisteredTimer("txpool/feed/time", nil)
feedResetTimer = metrics.NewRegisteredTimer("txpool/feed/reset/time", nil)
sendFeedTxCount = metrics.NewRegisteredCounter("txpool/sendfeed/tx", nil)
sendFeedResetTxCount = metrics.NewRegisteredCounter("txpool/sendfeed/reset/tx", nil)
demoteTxCount = metrics.NewRegisteredCounter("txpool/demote/tx/count", nil)
promoteTxCount = metrics.NewRegisteredCounter("txpool/promote/tx/count", nil)
promoteResetTxCount = metrics.NewRegisteredCounter("txpool/promote/reset/tx/count", nil)
reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil)
resetCount = metrics.NewRegisteredCounter("txpool/reorg/reset/count", nil)

loopReportTimer = metrics.NewRegisteredTimer("txpool/loop/report", nil)
)
Expand All @@ -178,6 +180,8 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled

Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Expand Down Expand Up @@ -261,6 +265,12 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
// log to inform user if the cache is enabled or not
if conf.EnableCache {
log.Info("legacytxpool Pending Cache is enabled")
} else {
log.Info("legacytxpool Pending Cache is disabled")
}
return conf
}

Expand Down Expand Up @@ -295,7 +305,10 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCache *cacheForMiner //pending list cache for miner
pendingCounter int
queueCounter int

pendingCache pendingCache //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand Down Expand Up @@ -338,6 +351,9 @@ func New(config Config, chain BlockChain) *LegacyPool {
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
}
if !config.EnableCache {
pool.pendingCache = newNoneCacheForMiner(pool)
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -1074,6 +1090,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
pool.queueCounter++
}
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
Expand Down Expand Up @@ -1132,6 +1149,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
} else {
// Nothing was replaced, bump the pending counter
pool.pendingCounter++
pendingGauge.Inc(1)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
Expand Down Expand Up @@ -1370,6 +1388,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Reduce the pending counter
pool.pendingCache.del(append(invalids, tx), pool.signer)
pendingGauge.Dec(int64(1 + len(invalids)))
pool.pendingCounter -= 1 + len(invalids)
return 1 + len(invalids)
}
}
Expand All @@ -1378,6 +1397,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
pool.queueCounter -= 1
}
if future.Empty() {
delete(pool.queue, addr)
Expand Down Expand Up @@ -1448,6 +1468,9 @@ func (pool *LegacyPool) scheduleReorgLoop() {
select {
case req := <-pool.reqResetCh:
// Reset request: update head if request is already pending.
if req.newHead != nil {
log.Info("txpool-trace receive reset request", "blockNumber", req.newHead.Number.Uint64())
}
if reset == nil {
reset = req
} else {
Expand Down Expand Up @@ -1491,20 +1514,37 @@ func (pool *LegacyPool) scheduleReorgLoop() {

// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
var reorgCost, reorgLockedCost, demoteCost, promoteCost time.Duration
var reorgCost, reorgLockedCost, demoteCost, promoteCost, waittime, truncatePending, truncateQueue, sendFeed time.Duration
var promoted []*types.Transaction
var demoted, sendFeedTxs int
var oldBlock, newBlock uint64 = 0, 0
defer func(t0 time.Time) {
reorgCost = time.Since(t0)
if reset != nil {
reorgResetCount.Inc(1)
resetCount.Inc(1)
reorgresetTimer.Update(reorgCost)
demoteTimer.Update(demoteCost)
reorgresetNoblockingTimer.Update(reorgLockedCost)
promoteResetTimer.Update(promoteCost)
promoteResetTxCount.Inc(int64(len(promoted)))
resetWaitLockTimer.Update(waittime)
demoteTxCount.Inc(int64(demoted))
truncateResetPendingTimer.Update(truncatePending)
truncateResetQueueTimer.Update(truncateQueue)
sendFeedResetTxCount.Inc(int64(sendFeedTxs))
feedResetTimer.Update(sendFeed)
log.Info("txpool-trace reset finished", "oldHead", oldBlock, "newHead", newBlock, "demoted", demoted, "sendFeedTxs", sendFeedTxs, "promoted", len(promoted))
} else {
reorgCount.Inc(1)
reorgDurationTimer.Update(reorgCost)
reorgNoBlockingDurationTimer.Update(reorgLockedCost)
promoteTimer.Update(promoteCost)
promoteTxCount.Inc(int64(len(promoted)))
reorgWaitLockTimer.Update(waittime)
truncatePendingTimer.Update(truncatePending)
truncateQueueTimer.Update(truncateQueue)
sendFeedTxCount.Inc(int64(sendFeedTxs))
feedTimer.Update(sendFeed)
}
}(time.Now())
defer close(done)
Expand All @@ -1517,9 +1557,18 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
promoteAddrs = dirtyAccounts.flatten()
}
tw := time.Now()
if reset != nil {
if reset.oldHead != nil {
oldBlock = reset.oldHead.Number.Uint64()
}
if reset.newHead != nil {
newBlock = reset.newHead.Number.Uint64()
}
log.Info("txpool-trace try to reset txpool", "oldHead", oldBlock, "newHead", newBlock)
}
pool.mu.Lock()
tl, t0 := time.Now(), time.Now()
reorgWaitLockTimer.UpdateSince(tw)
waittime = t0.Sub(tw)
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
demoteAddrs = pool.reset(reset.oldHead, reset.newHead)
Expand All @@ -1540,17 +1589,15 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
// Check for pending transactions for every account that sent new ones
t0 = time.Now()
promoted := pool.promoteExecutables(promoteAddrs)
promoted = pool.promoteExecutables(promoteAddrs)
promoteCost = time.Since(t0)
promoteTxCount.Inc(int64(len(promoted)))

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
t0 = time.Now()
if reset != nil {
demoted := pool.demoteUnexecutables(demoteAddrs)
demoteTxCount.Inc(int64(demoted))
demoted = pool.demoteUnexecutables(demoteAddrs)
var pendingBaseFee = pool.priced.urgent.baseFee
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
Expand All @@ -1570,10 +1617,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
t0 = time.Now()
pool.truncatePending()
truncatePendingTimer.UpdateSince(t0)
t0 = time.Now()
truncatePending = time.Since(t0)
pool.truncateQueue()
truncateQueueTimer.UpdateSince(t0)
truncateQueue = time.Since(t0) - truncatePending

dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
Expand All @@ -1595,18 +1641,20 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
sendFeedTxCount.Inc(int64(len(txs)))
sendFeedTxs += len(txs)
}
feedTimer.Update(time.Since(t0))
sendFeed = time.Since(t0)
}

// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []common.Address) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
var txnum int = 0
// collect demote addresses
var collectAddr = func(txs types.Transactions) {
txnum += len(txs)
addrs := make(map[common.Address]struct{})
for _, tx := range txs {
if !pool.Filter(tx) {
Expand All @@ -1629,12 +1677,12 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com
}
}

var depth uint64 = 1
var depth, oldNum, newNum uint64 = 1, 0, 0

if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
oldNum = oldHead.Number.Uint64()
newNum = newHead.Number.Uint64()

if depth = uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
Expand Down Expand Up @@ -1713,7 +1761,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com
}
}
resetDepthMeter.Mark(int64(depth))
log.Info("reset block depth", "depth", depth)
log.Info("txpool-trace reset txpool", "depth", depth, "fromBlock", oldNum, "toBlock", newNum, "txs", txnum)
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
Expand Down Expand Up @@ -1801,6 +1849,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
pool.queueCounter -= len(readies)

// Drop all transactions over the allowed limit
var caps types.Transactions
Expand All @@ -1816,6 +1865,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
pool.queueCounter -= len(forwards) + len(drops) + len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
Expand All @@ -1836,10 +1886,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// equal number for all for accounts with many pending transactions.
func (pool *LegacyPool) truncatePending() {
start := time.Now()
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
pending := uint64(pool.pendingCounter)
truncatePendingInnerTimer.UpdateSince(start)
if pending <= pool.config.GlobalSlots {
return
Expand Down Expand Up @@ -1885,6 +1932,7 @@ func (pool *LegacyPool) truncatePending() {
pool.priced.Removed(len(caps))
dropPendingCache = append(dropPendingCache, caps...)
pendingGauge.Dec(int64(len(caps)))
pool.pendingCounter -= len(caps)
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
}
Expand Down Expand Up @@ -1913,6 +1961,7 @@ func (pool *LegacyPool) truncatePending() {
dropPendingCache = append(dropPendingCache, caps...)
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
pool.pendingCounter -= len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
Expand All @@ -1927,10 +1976,7 @@ func (pool *LegacyPool) truncatePending() {
// truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
func (pool *LegacyPool) truncateQueue() {
start := time.Now()
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
queued := uint64(pool.queueCounter)
truncateQueueInnerTimer.UpdateSince(start)
if queued <= pool.config.GlobalQueue {
return
Expand Down Expand Up @@ -2027,6 +2073,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) (demot
dropPendingCache = append(dropPendingCache, invalids...)
dropPendingCache = append(dropPendingCache, drops...)
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
pool.pendingCounter -= len(olds) + len(drops) + len(invalids)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
Expand All @@ -2042,6 +2089,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) (demot
}
dropPendingCache = append(dropPendingCache, gapped...)
pendingGauge.Dec(int64(len(gapped)))
pool.pendingCounter -= len(gapped)
}
// Delete the entire pending entry if it became empty.
if list.Empty() {
Expand Down
Loading

0 comments on commit e871e53

Please sign in to comment.