From f918f1ca3d465f8195b89d9004f73ae3e7222313 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Mon, 4 Apr 2022 18:38:54 +0200 Subject: [PATCH 1/6] miner: do not commit interrupted blocks --- miner/worker.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index c6927a1ca1e8..38ead907bac6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1050,7 +1050,8 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +// Returns whether block should be discarded. +func (w *worker) fillTransactions(interrupt *int32, env *environment) bool { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1064,15 +1065,17 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) if w.commitTransactions(env, txs, interrupt) { - return + return true } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) if w.commitTransactions(env, txs, interrupt) { - return + return true } } + + return false } // generateWork generates a sealing block based on the given parameters. @@ -1083,7 +1086,10 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) + if w.fillTransactions(nil, work) { + return nil, errors.New("could not populate block") + } + return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1113,8 +1119,13 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { if !noempty && atomic.LoadUint32(&w.noempty) == 0 { w.commit(work.copy(), nil, false, start) } + // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + if w.fillTransactions(interrupt, work) { + work.discard() + return + } + w.commit(work.copy(), w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover From 2b74aa09f5b94d523f4d67da443e61288da7adce Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Tue, 5 Apr 2022 15:20:38 +0200 Subject: [PATCH 2/6] miner: refactor block building helpers to return proper errors --- miner/worker.go | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 38ead907bac6..1be3349e0342 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -77,6 +77,11 @@ const ( staleThreshold = 7 ) +var ( + ErrBlockInterruptedByNewHead = errors.New("new head arrived while building block") + ErrBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") +) + // environment is the worker's current environment and holds all // information of the sealing block generation. type environment struct { @@ -592,7 +597,11 @@ func (w *worker) mainLoop() { } txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil) + err := w.commitTransactions(w.current, txset, nil) + if err != nil { + // Log but update the snapshot anyway to ensure it's consistent with w.current + log.Error("unexpected error while committing transactions", "err", err) + } // Only update the snapshot if any new transactions were added // to the pending block @@ -841,7 +850,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -866,8 +875,9 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP ratio: ratio, inc: true, } + return ErrBlockInterruptedByRecommit } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + return ErrBlockInterruptedByNewHead } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -951,7 +961,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP if interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } - return false + return nil } // generateParams wraps various of settings for generating sealing task. @@ -1050,8 +1060,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -// Returns whether block should be discarded. -func (w *worker) fillTransactions(interrupt *int32, env *environment) bool { +func (w *worker) fillTransactions(interrupt *int32, env *environment) error { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1064,18 +1073,20 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) bool { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { - return true + err := w.commitTransactions(env, txs, interrupt) + if err != nil { + return err } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { - return true + err := w.commitTransactions(env, txs, interrupt) + if err != nil { + return err } } - return false + return nil } // generateWork generates a sealing block based on the given parameters. @@ -1086,8 +1097,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - if w.fillTransactions(nil, work) { - return nil, errors.New("could not populate block") + err = w.fillTransactions(nil, work) + if err != nil { + return nil, err } return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) @@ -1121,7 +1133,8 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } // Fill pending transactions from the txpool - if w.fillTransactions(interrupt, work) { + err = w.fillTransactions(interrupt, work) + if err != nil && err != ErrBlockInterruptedByRecommit { work.discard() return } From b56cc4aee92dd627e71ed5cb1cf3312bb4a78341 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Mon, 25 Apr 2022 13:36:46 +0200 Subject: [PATCH 3/6] Discard interrupt error where interrupt is not passed, use errors.Nil --- miner/worker.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 1be3349e0342..1b34efd85819 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -78,8 +78,8 @@ const ( ) var ( - ErrBlockInterruptedByNewHead = errors.New("new head arrived while building block") - ErrBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") + errBlockInterruptedByNewHead = errors.New("new head arrived while building block") + errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") ) // environment is the worker's current environment and holds all @@ -597,11 +597,7 @@ func (w *worker) mainLoop() { } txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) tcount := w.current.tcount - err := w.commitTransactions(w.current, txset, nil) - if err != nil { - // Log but update the snapshot anyway to ensure it's consistent with w.current - log.Error("unexpected error while committing transactions", "err", err) - } + w.commitTransactions(w.current, txset, nil) // Only update the snapshot if any new transactions were added // to the pending block @@ -875,9 +871,9 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP ratio: ratio, inc: true, } - return ErrBlockInterruptedByRecommit + return errBlockInterruptedByRecommit } - return ErrBlockInterruptedByNewHead + return errBlockInterruptedByNewHead } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -1097,10 +1093,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - err = w.fillTransactions(nil, work) - if err != nil { - return nil, err - } + w.fillTransactions(nil, work) return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1134,7 +1127,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { // Fill pending transactions from the txpool err = w.fillTransactions(interrupt, work) - if err != nil && err != ErrBlockInterruptedByRecommit { + if err != nil && !errors.Is(err, errBlockInterruptedByRecommit) { work.discard() return } From 95c0350b80d8430a07a9cd3e597a8e714fb76c55 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Mon, 25 Apr 2022 15:39:27 +0200 Subject: [PATCH 4/6] Adjust syntax --- miner/worker.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 1b34efd85819..ab80bff5e189 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1069,19 +1069,16 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) error { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - err := w.commitTransactions(env, txs, interrupt) - if err != nil { + if err := w.commitTransactions(env, txs, interrupt); err != nil { return err } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - err := w.commitTransactions(env, txs, interrupt) - if err != nil { + if err := w.commitTransactions(env, txs, interrupt); err != nil { return err } } - return nil } From 1501504f842ad2c9cfacf86058edc40a1b5b2b1d Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Wed, 27 Apr 2022 12:50:48 +0200 Subject: [PATCH 5/6] Adjust flaky test --- eth/catalyst/api.go | 9 --------- eth/catalyst/api_test.go | 2 +- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 45f233df6dfa..a7add4d6c889 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -349,11 +348,3 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.Pa } return beacon.BlockToExecutableData(block), nil } - -// Used in tests to add a the list of transactions from a block to the tx pool. -func (api *ConsensusAPI) insertTransactions(txs types.Transactions) error { - for _, tx := range txs { - api.eth.TxPool().AddLocal(tx) - } - return nil -} diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index de2e58a4f1e5..bbaa8ae16bc2 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -108,7 +108,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { api := NewConsensusAPI(ethservice) // Put the 10th block's tx in the pool and produce a new block - api.insertTransactions(blocks[9].Transactions()) + api.eth.TxPool().AddRemotesSync(blocks[9].Transactions()) blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[8].Time() + 5, } From a7ed5902002ee3b341cf6c29d1fac597d69f55ec Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Wed, 27 Apr 2022 17:19:29 +0200 Subject: [PATCH 6/6] Update miner/worker.go Co-authored-by: Martin Holst Swende --- miner/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/worker.go b/miner/worker.go index ab80bff5e189..31022e7e10f3 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1124,7 +1124,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { // Fill pending transactions from the txpool err = w.fillTransactions(interrupt, work) - if err != nil && !errors.Is(err, errBlockInterruptedByRecommit) { + if errors.Is(err, errBlockInterruptedByNewHead) { work.discard() return }