diff --git a/core/blockchain.go b/core/blockchain.go index 66b6fdcddc671..26aafe39009e3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -84,6 +84,8 @@ var ( errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") + errInvalidOldChain = errors.New("invalid old chain") + errInvalidNewChain = errors.New("invalid new chain") CheckpointCh = make(chan int) ) @@ -1476,7 +1478,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if reorg { // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { - if err := bc.reorg(currentBlock, block); err != nil { + if err := bc.reorg(currentBlock.Header(), block.Header()); err != nil { return NonStatTy, err } } @@ -1491,9 +1493,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.writeHeadBlock(block, false) // prepare set of masternodes for the next epoch if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) { - err := bc.UpdateM1() - if err != nil { - log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNum", block.NumberU64()) + if err := bc.UpdateM1(); err != nil { + log.Crit("Fail to update masternodes during writeBlockWithState", "number", block.Number, "hash", block.Hash().Hex(), "err", err) } } } @@ -2275,59 +2276,54 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { // reorg takes two blocks, an old chain and a new chain and will reconstruct the // blocks and inserts them to be part of the new canonical chain and accumulates // potential missing transactions and post an event about them. -func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { +func (bc *BlockChain) reorg(oldHead, newHead *types.Header) error { + log.Warn("Reorg", "OldHash", oldHead.Hash().Hex(), "OldNum", oldHead.Number, "NewHash", newHead.Hash().Hex(), "NewNum", newHead.Number) + var ( - newChain types.Blocks - oldChain types.Blocks - commonBlock *types.Block - deletedTxs types.Transactions - addedTxs types.Transactions - deletedLogs []*types.Log + newChain []*types.Header + oldChain []*types.Header + commonBlock *types.Header ) - log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64()) - - // first reduce whoever is higher bound - if oldBlock.NumberU64() > newBlock.NumberU64() { - // reduce old chain - for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { - oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 { - deletedLogs = append(deletedLogs, logs...) - } + + // Reduce the longer chain to the same number as the shorter one + if oldHead.Number.Uint64() > newHead.Number.Uint64() { + // Old chain is longer, gather all transactions and logs as deleted ones + for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) { + oldChain = append(oldChain, oldHead) } } else { - // reduce new chain and append new chain blocks for inserting later on - for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) + // New chain is longer, stash all blocks away for subsequent insertion + for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) { + newChain = append(newChain, newHead) } } - if oldBlock == nil { - return errors.New("invalid old chain") + if oldHead == nil { + return errInvalidOldChain } - if newBlock == nil { - return errors.New("invalid new chain") + if newHead == nil { + return errInvalidNewChain } + // Both sides of the reorg are at the same number, reduce both until the common + // ancestor is found for { - if oldBlock.Hash() == newBlock.Hash() { - commonBlock = oldBlock + // If the common ancestor was found, bail out + if oldHead.Hash() == newHead.Hash() { + commonBlock = oldHead break } + // Remove an old block as well as stash away a new block + oldChain = append(oldChain, oldHead) + newChain = append(newChain, newHead) - oldChain = append(oldChain, oldBlock) - newChain = append(newChain, newBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 { - deletedLogs = append(deletedLogs, logs...) + // Step back with both chains + oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) + if oldHead == nil { + return errInvalidOldChain } - - oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) - if oldBlock == nil { - return errors.New("invalid old chain") - } - if newBlock == nil { - return errors.New("invalid new chain") + newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) + if newHead == nil { + return errInvalidNewChain } } @@ -2335,23 +2331,21 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok { latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo() if latestCommittedBlock != nil { - currentBlock := bc.CurrentBlock() - currentBlock.Number().Cmp(latestCommittedBlock.Number) - cmp := commonBlock.Number().Cmp(latestCommittedBlock.Number) + cmp := commonBlock.Number.Cmp(latestCommittedBlock.Number) if cmp < 0 { for _, oldBlock := range oldChain { - if oldBlock.Number().Cmp(latestCommittedBlock.Number) == 0 { + if oldBlock.Number.Cmp(latestCommittedBlock.Number) == 0 { if oldBlock.Hash() != latestCommittedBlock.Hash { - log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "committed hash", latestCommittedBlock.Hash) + log.Error("Impossible reorg, please file an issue", "OldNum", oldBlock.Number, "OldHash", oldBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex()) } else { - log.Warn("Stop reorg, blockchain is under forking attack", "old committed num", oldBlock.Number(), "old committed hash", oldBlock.Hash()) - return fmt.Errorf("stop reorg, blockchain is under forking attack. old committed num %d, hash %x", oldBlock.Number(), oldBlock.Hash()) + log.Warn("Stop reorg, blockchain is under forking attack", "OldCommittedNum", oldBlock.Number, "OldCommittedHash", oldBlock.Hash().Hex()) + return fmt.Errorf("stop reorg, blockchain is under forking attack. OldCommitted num %d, hash %s", oldBlock.Number, oldBlock.Hash().Hex()) } } } } else if cmp == 0 { if commonBlock.Hash() != latestCommittedBlock.Hash { - log.Error("Impossible reorg, please file an issue", "oldnum", commonBlock.Number(), "oldhash", commonBlock.Hash(), "committed hash", latestCommittedBlock.Hash) + log.Error("Impossible reorg, please file an issue", "OldNum", commonBlock.Number.Uint64(), "OldHash", commonBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex()) } } } @@ -2359,69 +2353,146 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { - logFn := log.Warn + logFn := log.Info + msg := "Chain reorg detected" if len(oldChain) > 63 { + msg = "Large chain reorg detected" logFn = log.Warn } - logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(), - "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) + logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash().Hex(), + "drop", len(oldChain), "dropfrom", oldChain[0].Hash().Hex(), "add", len(newChain), "addfrom", newChain[0].Hash().Hex()) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) blockReorgMeter.Mark(1) + } else if len(newChain) > 0 { + // Special case happens in the post merge stage that current head is + // the ancestor of new head while these two blocks are not consecutive + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash()) + blockReorgAddMeter.Mark(int64(len(newChain))) } else { - log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash()) + // len(newChain) == 0 && len(oldChain) > 0 + // rewind the canonical chain to a lower point. + log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain)) } - // Insert the new chain(except the head block(reverse order)), - // taking care of the proper incremental order. - for i := len(newChain) - 1; i >= 0; i-- { - // insert the block in the canonical way, re-writing history - bc.writeHeadBlock(newChain[i], true) + // Acquire the tx-lookup lock before mutation. This step is essential + // as the txlookups should be changed atomically, and all subsequent + // reads should be blocked until the mutation is complete. + // bc.txLookupLock.Lock() + + // Reorg can be executed, start reducing the chain's old blocks and appending + // the new blocks + var ( + deletedTxs []common.Hash + rebirthTxs []common.Hash + + deletedLogs []*types.Log + rebirthLogs []*types.Log + ) + + // Deleted log emission on the API uses forward order, which is borked, but + // we'll leave it in for legacy reasons. + // + // TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs? + { + for i := len(oldChain) - 1; i >= 0; i-- { + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics + } + if logs := bc.collectLogs(block, true); len(logs) > 0 { + deletedLogs = append(deletedLogs, logs...) + } + if len(deletedLogs) > 512 { + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + deletedLogs = nil + } + // TODO(daniel): remove chainSideFeed, reference PR #30601 + // Also send event for blocks removed from the canon chain. + // bc.chainSideFeed.Send(ChainSideEvent{Block: block}) + } + if len(deletedLogs) > 0 { + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + } + } - // Collect the new added transactions. - addedTxs = append(addedTxs, newChain[i].Transactions()...) + // Undo old blocks in reverse order + for i := 0; i < len(oldChain); i++ { + // Collect all the deleted transactions + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics + } + for _, tx := range block.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } + // Collect deleted logs and emit them for new integrations + // if logs := bc.collectLogs(block, true); len(logs) > 0 { + // slices.Reverse(logs) // Emit revertals latest first, older then + // } + } + // Apply new blocks in forward order + for i := len(newChain) - 1; i >= 0; i-- { + // Collect all the included transactions + block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + if block == nil { + return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics + } + for _, tx := range block.Transactions() { + rebirthTxs = append(rebirthTxs, tx.Hash()) + } + // Collect inserted logs and emit them + if logs := bc.collectLogs(block, false); len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs...) + } + if len(rebirthLogs) > 512 { + bc.logsFeed.Send(rebirthLogs) + rebirthLogs = nil + } + // Update the head block + bc.writeHeadBlock(block, true) // prepare set of masternodes for the next epoch - if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) { - err := bc.UpdateM1() - if err != nil { - log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNumber", newChain[i].NumberU64()) + if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) { + if err := bc.UpdateM1(); err != nil { + log.Crit("Fail to update masternodes during reorg", "number", block.Number, "hash", block.Hash().Hex(), "err", err) } } } + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(rebirthLogs) + } // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. - indexesBatch := bc.db.NewBatch() - for _, tx := range types.TxDifference(deletedTxs, addedTxs) { - rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) + batch := bc.db.NewBatch() + for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) { + rawdb.DeleteTxLookupEntry(batch, tx) + } + // Delete all hash markers that are not part of the new canonical chain. + // Because the reorg function handles new chain head, all hash + // markers greater than new chain head should be deleted. + number := commonBlock.Number + if len(newChain) > 0 { + number = newChain[0].Number } - // Delete any canonical number assignments above the new head - number := bc.CurrentBlock().NumberU64() - for i := number + 1; ; i++ { + for i := number.Uint64() + 1; ; i++ { hash := rawdb.ReadCanonicalHash(bc.db, i) if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(indexesBatch, i) + rawdb.DeleteCanonicalHash(batch, i) } - if err := indexesBatch.Write(); err != nil { + if err := batch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } - // If any logs need to be fired, do it now. In theory we could avoid creating - // this goroutine if there are no events to fire, but realistcally that only - // ever happens if we're reorging empty blocks, which will only happen on idle - // networks where performance is not an issue either way. - if len(deletedLogs) > 0 { - go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - if len(oldChain) > 0 { - go func() { - for i := len(oldChain) - 1; i >= 0; i-- { - bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) - } - }() - } + + // Reset the tx lookup cache to clear stale txlookup cache. + // bc.txLookupCache.Purge() + + // Release the tx-lookup lock after mutation. + // bc.txLookupLock.Unlock() + return nil } diff --git a/core/types/transaction.go b/core/types/transaction.go index 3054534e558de..d7cbb55a2536b 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -678,6 +678,24 @@ func TxDifference(a, b Transactions) (keep Transactions) { return keep } +// HashDifference returns a new set of hashes that are present in a but not in b. +func HashDifference(a, b []common.Hash) []common.Hash { + keep := make([]common.Hash, 0, len(a)) + + remove := make(map[common.Hash]struct{}) + for _, hash := range b { + remove[hash] = struct{}{} + } + + for _, hash := range a { + if _, ok := remove[hash]; !ok { + keep = append(keep, hash) + } + } + + return keep +} + // TxByNonce implements the sort interface to allow sorting a list of transactions // by their nonces. This is usually only useful for sorting transactions from a // single account, otherwise a nonce comparison doesn't make much sense.