diff --git a/core/blockchain.go b/core/blockchain.go index 561e102abc..576dcddd58 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -242,16 +242,17 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - lastWrite uint64 // Last block when the state was flushed - flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state - triedb *triedb.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) - proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. - txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + lastWrite uint64 // Last block when the state was flushed + flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state + triedb *triedb.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) + proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. + txIndexer *txIndexer // Transaction indexer, might be nil if not enabled + stateRecoverStatus bool hc *HeaderChain rmLogsFeed event.Feed @@ -355,6 +356,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), engine: engine, vmConfig: vmConfig, + stateRecoverStatus: false, } bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit)) bc.forker = NewForkChoice(bc, shouldPreserve) @@ -2232,6 +2234,16 @@ func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, e // recoverAncestors is only used post-merge. // We return the hash of the latest block that we could correctly validate. func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) { + if bc.StateRecoveringStatus { + log.Warn("recover is already in progress, skipping", "block", block.Hash()) + return common.Hash{}, errors.New("state recover in progress") + } + + bc.StateRecoveringStatus = true + defer func() { + bc.StateRecoveringStatus = false + }() + // Gather all the sidechain hashes (full blocks may be memory heavy) var ( hashes []common.Hash diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5b637901c8..8cb9f14716 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1847,7 +1847,3 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta)) d.syncLogTime = time.Now() } - -func (d *Downloader) GetAllPeers() []*peerConnection { - return d.peers.AllPeers() -} diff --git a/eth/downloader/fetchers.go b/eth/downloader/fetchers.go index 05898695ca..cc4279b0da 100644 --- a/eth/downloader/fetchers.go +++ b/eth/downloader/fetchers.go @@ -17,7 +17,6 @@ package downloader import ( - "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -114,16 +113,3 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil } } - -func (d *Downloader) GetHeaderByHashFromPeer(peer *peerConnection, blockHash common.Hash) (*types.Header, error) { - headers, _, err := d.fetchHeadersByHash(peer, blockHash, 1, 0, false) - if err != nil { - return nil, fmt.Errorf("failed to fetch header from peer: %v", err) - } - - if len(headers) == 0 { - return nil, fmt.Errorf("no headers returned for hash: %v", blockHash) - } - - return headers[0], nil -} diff --git a/miner/fix_manager.go b/miner/fix_manager.go index afb62840e6..316eb703fc 100644 --- a/miner/fix_manager.go +++ b/miner/fix_manager.go @@ -6,23 +6,18 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/log" ) // StateFixManager manages the fix operation state and notification mechanism. type StateFixManager struct { - mutex sync.Mutex // Protects access to fix state - isFixInProgress bool // Tracks if a fix operation is in progress - downloader *downloader.Downloader // Used to trigger BeaconSync operations + mutex sync.Mutex // Protects access to fix state + isFixInProgress bool // Tracks if a fix operation is in progress } // NewFixManager initializes a FixManager with required dependencies -func NewFixManager(downloader *downloader.Downloader) *StateFixManager { - return &StateFixManager{ - downloader: downloader, - } +func NewFixManager() *StateFixManager { + return &StateFixManager{} } // StartFix launches a goroutine to manage the fix process and tracks the fix state. @@ -68,32 +63,3 @@ func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) er log.Info("Recovered states up to block", "latestValid", latestValid) return nil } - -// RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful. -// -// blockHash: The latest header(unsafe block) hash of the block to recover. -func (fm *StateFixManager) RecoverFromPeer(blockHash common.Hash) error { - peers := fm.downloader.GetAllPeers() - if len(peers) == 0 { - return fmt.Errorf("no peers available") - } - - var header *types.Header - var err error - for _, peer := range peers { - header, err = fm.downloader.GetHeaderByHashFromPeer(peer, blockHash) - if err == nil && header != nil { - break - } - log.Warn("Failed to retrieve header from peer", "err", err) - } - - if header == nil { - return fmt.Errorf("failed to retrieve header from all valid peers") - } - - log.Info("Successfully retrieved header from peer", "blockHash", blockHash) - - fm.downloader.BeaconSync(downloader.FullSync, header, nil) - return nil -} diff --git a/miner/miner.go b/miner/miner.go index 02b2fd5699..53755ad632 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -87,7 +87,6 @@ var DefaultMevConfig = MevConfig{ type Backend interface { BlockChain() *core.BlockChain TxPool() *txpool.TxPool - Downloader() *downloader.Downloader } type BackendWithHistoricalState interface { @@ -308,11 +307,6 @@ func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { return miner.worker.buildPayload(args) } -// Worker builds the payload according to the provided parameters. -func (miner *Miner) Worker() *worker { - return miner.worker -} - func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) { env, err := miner.prepareSimulationEnv() diff --git a/miner/miner_test.go b/miner/miner_test.go index e3fd39fd51..5907fb4464 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -60,10 +60,6 @@ func (m *mockBackend) TxPool() *txpool.TxPool { return m.txPool } -func (m *mockBackend) Downloader() *downloader.Downloader { - return nil -} - func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { return nil, errors.New("not supported") } diff --git a/miner/payload_building.go b/miner/payload_building.go index 7b93600bb8..457e526218 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -269,18 +269,7 @@ func (payload *Payload) stopBuilding() { } // fix attempts to recover and repair the block and its associated data (such as MPT) -// either from the local blockchain or from peers. -// -// In most cases, the block can be recovered from the local node's data. However, -// there is a corner case where this may not be possible: If the sequencer -// broadcasts a block but the local node crashes before fully writing the block to its local -// storage, the local chain might be lagging behind by one block compared to peers. -// In such cases, we need to recover the missing block data from peers. -// -// The function first tries to recover the block using the local blockchain via the -// fixManager.RecoverFromLocal method. If local recovery fails (e.g., due to the node -// missing the block), it attempts to retrieve the block header from peers and triggers -// +// from the local blockchain // blockHash: The hash of the latest block that needs to be recovered and fixed. func (w *worker) fix(blockHash common.Hash) error { log.Info("Fix operation started") @@ -288,19 +277,8 @@ func (w *worker) fix(blockHash common.Hash) error { // Try to recover from local data err := w.stateFixManager.RecoverFromLocal(w, blockHash) if err != nil { - // Only proceed to peer recovery if the error is "block not found in local chain" - if strings.Contains(err.Error(), "block not found") { - log.Warn("Local recovery failed, trying to recover from peers", "err", err) - - // Try to recover from peers - err = w.stateFixManager.RecoverFromPeer(blockHash) - if err != nil { - return err - } - } else { - log.Error("Failed to recover from local data", "err", err) - return err - } + log.Error("Failed to recover from local data", "err", err) + return err } log.Info("Fix operation completed successfully") @@ -406,10 +384,8 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { start := time.Now() // getSealingBlock is interrupted by shared interrupt r := w.getSealingBlock(fullParams) - dur := time.Since(start) // update handles error case - payload.update(r, dur, func() { w.cacheMiningBlock(r.block, r.env) }) diff --git a/miner/worker.go b/miner/worker.go index 7832c1eeb4..b8df413238 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -303,7 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), bundleCache: NewBundleCache(), - stateFixManager: NewFixManager(eth.Downloader()), + stateFixManager: NewFixManager(), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) diff --git a/miner/worker_test.go b/miner/worker_test.go index 035233353b..1c19e60de9 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -34,11 +36,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) const ( @@ -146,9 +146,8 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine } } -func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } -func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } -func (b *testWorkerBackend) Downloader() *downloader.Downloader { return nil } +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { var tx *types.Transaction