From 56011b6d87ae3af84ec436174f300b4566ce3435 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 15 Oct 2024 03:27:54 +0200 Subject: [PATCH 1/7] core: try to reduce peak memory usage during reorg --- core/blockchain.go | 15 ++++++++++----- core/blockchain_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index d580d708d947..c91c72a44e2d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2186,7 +2186,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { var ( newChain types.Blocks - oldChain types.Blocks + oldChain []*types.Header commonBlock *types.Block deletedTxs []common.Hash @@ -2202,7 +2202,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if oldBlock.NumberU64() > newBlock.NumberU64() { // Old chain is longer, gather all transactions and logs as deleted ones for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { - oldChain = append(oldChain, oldBlock) + oldChain = append(oldChain, oldBlock.Header()) for _, tx := range oldBlock.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } @@ -2228,7 +2228,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { break } // Remove an old block as well as stash away a new block - oldChain = append(oldChain, oldBlock) + oldChain = append(oldChain, oldBlock.Header()) for _, tx := range oldBlock.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } @@ -2261,7 +2261,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { } else if len(newChain) > 0 { // Special case happens in the post merge stage that current head is // the ancestor of new head while these two blocks are not consecutive - log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash()) + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) } else { // len(newChain) == 0 && len(oldChain) > 0 @@ -2273,6 +2273,10 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // reads should be blocked until the mutation is complete. bc.txLookupLock.Lock() + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + panic(stats.HeapInuse) + // Insert the new chain segment in incremental order, from the old // to the new. The new chain head (newChain[0]) is not inserted here, // as it will be handled separately outside of this function @@ -2325,8 +2329,9 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // Deleted logs + blocks: var deletedLogs []*types.Log for i := len(oldChain) - 1; i >= 0; i-- { + oldBlock = bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) // Collect deleted logs for notification - if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 { + if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 { deletedLogs = append(deletedLogs, logs...) } if len(deletedLogs) > 512 { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 9f491e1bfd1e..ae2eedd3ddfc 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4231,3 +4231,35 @@ func TestPragueRequests(t *testing.T) { t.Fatalf("block %d: failed to insert into chain: %v", n, err) } } + +func BenchmarkReorg(b *testing.B) { + chainLength := b.N + + dir := b.TempDir() + db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false) + if err != nil { + b.Fatalf("cannot create temporary database: %v", err) + } + defer db.Close() + gspec := &Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}}, + } + blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + // Insert an easy and a difficult chain afterwards + easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(500)) + diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(500)) + + if _, err := blockchain.InsertChain(easyBlocks); err != nil { + b.Fatalf("failed to insert easy chain: %v", err) + } + b.ResetTimer() + if _, err := blockchain.InsertChain(diffBlocks); err != nil { + b.Fatalf("failed to insert difficult chain: %v", err) + } +} + +// BenchmarkReorg-8 10000 362204 ns/op 271290 B/op 1256 allocs/op 151412736 bytes of heap used +// BenchmarkReorg-8 10000 381835 ns/op 276959 B/op 1324 allocs/op 152895488 bytes of heap used From 99c0c1818a6fae462e1c10693b0df6bde119bfed Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 15 Oct 2024 11:04:05 +0200 Subject: [PATCH 2/7] core: add better benchmark numbers --- core/blockchain.go | 6 +++--- core/blockchain_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c91c72a44e2d..b3a9ed21a309 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2273,9 +2273,9 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // reads should be blocked until the mutation is complete. bc.txLookupLock.Lock() - var stats runtime.MemStats - runtime.ReadMemStats(&stats) - panic(stats.HeapInuse) + //var stats runtime.MemStats + //runtime.ReadMemStats(&stats) + //panic(stats.HeapInuse) // Insert the new chain segment in incremental order, from the old // to the new. The new chain head (newChain[0]) is not inserted here, diff --git a/core/blockchain_test.go b/core/blockchain_test.go index ae2eedd3ddfc..b518819a2150 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4249,8 +4249,8 @@ func BenchmarkReorg(b *testing.B) { defer blockchain.Stop() // Insert an easy and a difficult chain afterwards - easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(500)) - diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(500)) + easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000)) + diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000)) if _, err := blockchain.InsertChain(easyBlocks); err != nil { b.Fatalf("failed to insert easy chain: %v", err) @@ -4261,5 +4261,5 @@ func BenchmarkReorg(b *testing.B) { } } -// BenchmarkReorg-8 10000 362204 ns/op 271290 B/op 1256 allocs/op 151412736 bytes of heap used -// BenchmarkReorg-8 10000 381835 ns/op 276959 B/op 1324 allocs/op 152895488 bytes of heap used +// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used +// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used From c7b3922ed6d1f9493f16ed713928c72db64df789 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 15 Oct 2024 11:12:00 +0200 Subject: [PATCH 3/7] core: also reduce memory of newChain --- core/blockchain.go | 16 +++++++++------- core/blockchain_test.go | 1 + 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b3a9ed21a309..ce0f21185c76 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2185,7 +2185,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { // externally. func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { var ( - newChain types.Blocks + newChain []*types.Header oldChain []*types.Header commonBlock *types.Block @@ -2210,7 +2210,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { } else { // New chain is longer, stash all blocks away for subsequent insertion for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) + newChain = append(newChain, newBlock.Header()) } } if oldBlock == nil { @@ -2232,7 +2232,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { for _, tx := range oldBlock.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } - newChain = append(newChain, newBlock) + newChain = append(newChain, newBlock.Header()) // Step back with both chains oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) @@ -2282,10 +2282,11 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // as it will be handled separately outside of this function for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - bc.writeHeadBlock(newChain[i]) + newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + bc.writeHeadBlock(newBlock) // Collect the new added transactions. - for _, tx := range newChain[i].Transactions() { + for _, tx := range newBlock.Transactions() { addedTxs = append(addedTxs, tx.Hash()) } } @@ -2304,7 +2305,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // markers greater than or equal to new chain head should be deleted. number := commonBlock.NumberU64() if len(newChain) > 1 { - number = newChain[1].NumberU64() + number = newChain[1].Number.Uint64() } for i := number + 1; ; i++ { hash := rawdb.ReadCanonicalHash(bc.db, i) @@ -2346,7 +2347,8 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // New logs: var rebirthLogs []*types.Log for i := len(newChain) - 1; i >= 1; i-- { - if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 { + newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { rebirthLogs = append(rebirthLogs, logs...) } if len(rebirthLogs) > 512 { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index b518819a2150..d8f7da0643ca 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4263,3 +4263,4 @@ func BenchmarkReorg(b *testing.B) { // Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used // WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used +// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used From 3da0fa5ec27be06989b9ee9da39432eb8df2ab5b Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Wed, 16 Oct 2024 11:21:19 +0200 Subject: [PATCH 4/7] core: reorganize the code a bit --- core/blockchain.go | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ce0f21185c76..e78ac2a0dfea 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2268,10 +2268,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // rewind the canonical chain to a lower point. log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain)) } - // Acquire the tx-lookup lock before mutation. This step is essential - // as the txlookups should be changed atomically, and all subsequent - // reads should be blocked until the mutation is complete. - bc.txLookupLock.Lock() //var stats runtime.MemStats //runtime.ReadMemStats(&stats) @@ -2280,6 +2276,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // Insert the new chain segment in incremental order, from the old // to the new. The new chain head (newChain[0]) is not inserted here, // as it will be handled separately outside of this function + var rebirthLogs []*types.Log for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) @@ -2289,8 +2286,24 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { for _, tx := range newBlock.Transactions() { addedTxs = append(addedTxs, tx.Hash()) } + // Collect the logs and send them in batches of 512. + if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs...) + } + if len(rebirthLogs) > 512 { + bc.logsFeed.Send(rebirthLogs) + rebirthLogs = nil + } + } + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(rebirthLogs) } + // Acquire the tx-lookup lock before mutation. This step is essential + // as the txlookups should be changed atomically, and all subsequent + // reads should be blocked until the mutation is complete. + bc.txLookupLock.Lock() + // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. var ( @@ -2323,9 +2336,9 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // Release the tx-lookup lock after mutation. bc.txLookupLock.Unlock() - // Send out events for logs from the old canon chain, and 'reborn' - // logs from the new canon chain. The number of logs can be very - // high, so the events are sent in batches of size around 512. + // Send out events for logs from the old canon chain. + // The number of logs can be very high, + // so the events are sent in batches of size around 512. // Deleted logs + blocks: var deletedLogs []*types.Log @@ -2343,22 +2356,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if len(deletedLogs) > 0 { bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } - - // New logs: - var rebirthLogs []*types.Log - for i := len(newChain) - 1; i >= 1; i-- { - newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) - if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs...) - } - if len(rebirthLogs) > 512 { - bc.logsFeed.Send(rebirthLogs) - rebirthLogs = nil - } - } - if len(rebirthLogs) > 0 { - bc.logsFeed.Send(rebirthLogs) - } return nil } From b0799c24e78f862effab3a9e37b4a87707fc5524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 16 Oct 2024 16:37:57 +0300 Subject: [PATCH 5/7] core: revert log emission order --- core/blockchain.go | 47 ++++++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index e78ac2a0dfea..eda714cff8d3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2268,15 +2268,14 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // rewind the canonical chain to a lower point. log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain)) } - - //var stats runtime.MemStats - //runtime.ReadMemStats(&stats) - //panic(stats.HeapInuse) + // Acquire the tx-lookup lock before mutation. This step is essential + // as the txlookups should be changed atomically, and all subsequent + // reads should be blocked until the mutation is complete. + bc.txLookupLock.Lock() // Insert the new chain segment in incremental order, from the old // to the new. The new chain head (newChain[0]) is not inserted here, // as it will be handled separately outside of this function - var rebirthLogs []*types.Log for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) @@ -2286,24 +2285,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { for _, tx := range newBlock.Transactions() { addedTxs = append(addedTxs, tx.Hash()) } - // Collect the logs and send them in batches of 512. - if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs...) - } - if len(rebirthLogs) > 512 { - bc.logsFeed.Send(rebirthLogs) - rebirthLogs = nil - } } - if len(rebirthLogs) > 0 { - bc.logsFeed.Send(rebirthLogs) - } - - // Acquire the tx-lookup lock before mutation. This step is essential - // as the txlookups should be changed atomically, and all subsequent - // reads should be blocked until the mutation is complete. - bc.txLookupLock.Lock() - // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. var ( @@ -2336,9 +2318,9 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // Release the tx-lookup lock after mutation. bc.txLookupLock.Unlock() - // Send out events for logs from the old canon chain. - // The number of logs can be very high, - // so the events are sent in batches of size around 512. + // Send out events for logs from the old canon chain, and 'reborn' + // logs from the new canon chain. The number of logs can be very + // high, so the events are sent in batches of size around 512. // Deleted logs + blocks: var deletedLogs []*types.Log @@ -2356,6 +2338,21 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if len(deletedLogs) > 0 { bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } + // New logs: + var rebirthLogs []*types.Log + for i := len(newChain) - 1; i >= 1; i-- { + newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs...) + } + if len(rebirthLogs) > 512 { + bc.logsFeed.Send(rebirthLogs) + rebirthLogs = nil + } + } + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(rebirthLogs) + } return nil } From a6c4273a59e78ac1293da1cda08e3a725f6bbc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 16 Oct 2024 18:30:20 +0300 Subject: [PATCH 6/7] core: rework reorg to use headers only; emit reverted logs in reverse --- core/blockchain.go | 188 ++++++++++++++++++++++----------------------- 1 file changed, 92 insertions(+), 96 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index eda714cff8d3..a93682d0e96f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -23,6 +23,7 @@ import ( "io" "math/big" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -1435,7 +1436,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e func (bc *BlockChain) writeKnownBlock(block *types.Block) error { current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { - if err := bc.reorg(current, block); err != nil { + if err := bc.reorg(current, block.Header()); err != nil { return err } } @@ -1541,7 +1542,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { - if err := bc.reorg(currentBlock, block); err != nil { + if err := bc.reorg(currentBlock, block.Header()); err != nil { return NonStatTy, err } } @@ -2154,8 +2155,8 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co return block.Hash(), nil } -// collectLogs collects the logs that were generated or removed during -// the processing of a block. These logs are later announced as deleted or reborn. +// collectLogs collects the logs that were generated or removed during the +// processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { var blobGasPrice *big.Int excessBlobGas := b.ExcessBlobGas() @@ -2181,70 +2182,60 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { // reorg takes two blocks, an old chain and a new chain and will reconstruct the // blocks and inserts them to be part of the new canonical chain and accumulates // potential missing transactions and post an event about them. +// // Note the new head block won't be processed here, callers need to handle it // externally. -func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { +func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error { var ( newChain []*types.Header oldChain []*types.Header - commonBlock *types.Block - - deletedTxs []common.Hash - addedTxs []common.Hash + commonBlock *types.Header ) - oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) - if oldBlock == nil { - return errors.New("current head block missing") + // Skip handling the newHead as it's handled outside + newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) + if newHead == nil { + return errInvalidNewChain } - newBlock := newHead - // Reduce the longer chain to the same number as the shorter one - if oldBlock.NumberU64() > newBlock.NumberU64() { + if oldHead.Number.Uint64() > newHead.Number.Uint64() { // Old chain is longer, gather all transactions and logs as deleted ones - for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { - oldChain = append(oldChain, oldBlock.Header()) - for _, tx := range oldBlock.Transactions() { - deletedTxs = append(deletedTxs, tx.Hash()) - } + for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) { + oldChain = append(oldChain, oldHead) } } else { // New chain is longer, stash all blocks away for subsequent insertion - for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock.Header()) + for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) { + newChain = append(newChain, newHead) } } - if oldBlock == nil { + if oldHead == nil { return errInvalidOldChain } - if newBlock == nil { + if newHead == nil { return errInvalidNewChain } // Both sides of the reorg are at the same number, reduce both until the common // ancestor is found for { // If the common ancestor was found, bail out - if oldBlock.Hash() == newBlock.Hash() { - commonBlock = oldBlock + if oldHead.Hash() == newHead.Hash() { + commonBlock = oldHead break } // Remove an old block as well as stash away a new block - oldChain = append(oldChain, oldBlock.Header()) - for _, tx := range oldBlock.Transactions() { - deletedTxs = append(deletedTxs, tx.Hash()) - } - newChain = append(newChain, newBlock.Header()) + oldChain = append(oldChain, oldHead) + newChain = append(newChain, newHead) // Step back with both chains - oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) - if oldBlock == nil { + oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) + if oldHead == nil { return errInvalidOldChain } - newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) - if newBlock == nil { + newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) + if newHead == nil { return errInvalidNewChain } } - // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { logFn := log.Info @@ -2253,7 +2244,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { msg = "Large chain reorg detected" logFn = log.Warn } - logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(), + logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash(), "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) @@ -2266,68 +2257,35 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { } else { // len(newChain) == 0 && len(oldChain) > 0 // rewind the canonical chain to a lower point. - log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain)) + log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain)) } // Acquire the tx-lookup lock before mutation. This step is essential // as the txlookups should be changed atomically, and all subsequent // reads should be blocked until the mutation is complete. bc.txLookupLock.Lock() - // Insert the new chain segment in incremental order, from the old - // to the new. The new chain head (newChain[0]) is not inserted here, - // as it will be handled separately outside of this function - for i := len(newChain) - 1; i >= 1; i-- { - // Insert the block in the canonical way, re-writing history - newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) - bc.writeHeadBlock(newBlock) - - // Collect the new added transactions. - for _, tx := range newBlock.Transactions() { - addedTxs = append(addedTxs, tx.Hash()) - } - } - // Delete useless indexes right now which includes the non-canonical - // transaction indexes, canonical chain indexes which above the head. + // Reorg can be executed, start reducing the chain's old blocks and appending + // the new blocks var ( - indexesBatch = bc.db.NewBatch() - diffs = types.HashDifference(deletedTxs, addedTxs) + deletedTxs []common.Hash + rebirthTxs []common.Hash + + deletedLogs []*types.Log + rebirthLogs []*types.Log ) - for _, tx := range diffs { - rawdb.DeleteTxLookupEntry(indexesBatch, tx) - } - // Delete all hash markers that are not part of the new canonical chain. - // Because the reorg function does not handle new chain head, all hash - // markers greater than or equal to new chain head should be deleted. - number := commonBlock.NumberU64() - if len(newChain) > 1 { - number = newChain[1].Number.Uint64() - } - for i := number + 1; ; i++ { - hash := rawdb.ReadCanonicalHash(bc.db, i) - if hash == (common.Hash{}) { - break + // Undo old blocks in reverse order + for i := 0; i < len(oldChain); i++ { + // Collect all the deleted transactions + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics } - rawdb.DeleteCanonicalHash(indexesBatch, i) - } - if err := indexesBatch.Write(); err != nil { - log.Crit("Failed to delete useless indexes", "err", err) - } - // Reset the tx lookup cache to clear stale txlookup cache. - bc.txLookupCache.Purge() - - // Release the tx-lookup lock after mutation. - bc.txLookupLock.Unlock() - - // Send out events for logs from the old canon chain, and 'reborn' - // logs from the new canon chain. The number of logs can be very - // high, so the events are sent in batches of size around 512. - - // Deleted logs + blocks: - var deletedLogs []*types.Log - for i := len(oldChain) - 1; i >= 0; i-- { - oldBlock = bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) - // Collect deleted logs for notification - if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 { + for _, tx := range block.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } + // Collect deleted logs and emit them + if logs := bc.collectLogs(block, true); len(logs) > 0 { + slices.Reverse(logs) // Emit revertals latest first, older then deletedLogs = append(deletedLogs, logs...) } if len(deletedLogs) > 512 { @@ -2338,21 +2296,59 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { if len(deletedLogs) > 0 { bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } - // New logs: - var rebirthLogs []*types.Log - for i := len(newChain) - 1; i >= 1; i-- { - newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) - if logs := bc.collectLogs(newBlock, false); len(logs) > 0 { + // Apply new blocks in forward order + for i := len(newChain) - 1; i > 0; i-- { + // Collect all the included transactions + block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + if block == nil { + return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics + } + for _, tx := range block.Transactions() { + rebirthTxs = append(rebirthTxs, tx.Hash()) + } + // Collect inserted logs and emit them + if logs := bc.collectLogs(block, false); len(logs) > 0 { rebirthLogs = append(rebirthLogs, logs...) } if len(rebirthLogs) > 512 { bc.logsFeed.Send(rebirthLogs) rebirthLogs = nil } + // Update the head block + bc.writeHeadBlock(block) } if len(rebirthLogs) > 0 { bc.logsFeed.Send(rebirthLogs) } + // Delete useless indexes right now which includes the non-canonical + // transaction indexes, canonical chain indexes which above the head. + batch := bc.db.NewBatch() + for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) { + rawdb.DeleteTxLookupEntry(batch, tx) + } + // Delete all hash markers that are not part of the new canonical chain. + // Because the reorg function does not handle new chain head, all hash + // markers greater than or equal to new chain head should be deleted. + number := commonBlock.Number + if len(newChain) > 0 { + number = newChain[0].Number + } + for i := number.Uint64() + 1; ; i++ { + hash := rawdb.ReadCanonicalHash(bc.db, i) + if hash == (common.Hash{}) { + break + } + rawdb.DeleteCanonicalHash(batch, i) + } + if err := batch.Write(); err != nil { + log.Crit("Failed to delete useless indexes", "err", err) + } + // Reset the tx lookup cache to clear stale txlookup cache. + bc.txLookupCache.Purge() + + // Release the tx-lookup lock after mutation. + bc.txLookupLock.Unlock() + return nil } @@ -2390,7 +2386,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { // Run the reorg if necessary and set the given block as new head. start := time.Now() if head.ParentHash() != bc.CurrentBlock().Hash() { - if err := bc.reorg(bc.CurrentBlock(), head); err != nil { + if err := bc.reorg(bc.CurrentBlock(), head.Header()); err != nil { return common.Hash{}, err } } From bcd993a13ddfb4674b02ca9a39ab9bf213286637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 16 Oct 2024 19:20:55 +0300 Subject: [PATCH 7/7] core: revert faulty API behavior, add shador code with correct behavior --- core/blockchain.go | 48 +++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index a93682d0e96f..02c0bbaad1cb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2191,11 +2191,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error oldChain []*types.Header commonBlock *types.Header ) - // Skip handling the newHead as it's handled outside - newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) - if newHead == nil { - return errInvalidNewChain - } // Reduce the longer chain to the same number as the shorter one if oldHead.Number.Uint64() > newHead.Number.Uint64() { // Old chain is longer, gather all transactions and logs as deleted ones @@ -2273,6 +2268,28 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error deletedLogs []*types.Log rebirthLogs []*types.Log ) + // Deleted log emission on the API uses forward order, which is borked, but + // we'll leave it in for legacy reasons. + // + // TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs? + { + for i := len(oldChain) - 1; i >= 0; i-- { + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics + } + if logs := bc.collectLogs(block, true); len(logs) > 0 { + deletedLogs = append(deletedLogs, logs...) + } + if len(deletedLogs) > 512 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + deletedLogs = nil + } + } + if len(deletedLogs) > 0 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + } + } // Undo old blocks in reverse order for i := 0; i < len(oldChain); i++ { // Collect all the deleted transactions @@ -2283,21 +2300,16 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error for _, tx := range block.Transactions() { deletedTxs = append(deletedTxs, tx.Hash()) } - // Collect deleted logs and emit them + // Collect deleted logs and emit them for new integrations if logs := bc.collectLogs(block, true); len(logs) > 0 { - slices.Reverse(logs) // Emit revertals latest first, older then - deletedLogs = append(deletedLogs, logs...) - } - if len(deletedLogs) > 512 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - deletedLogs = nil + // Emit revertals latest first, older then + slices.Reverse(logs) + + // TODO(karalabe): Hook into the reverse emission part } } - if len(deletedLogs) > 0 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } // Apply new blocks in forward order - for i := len(newChain) - 1; i > 0; i-- { + for i := len(newChain) - 1; i >= 1; i-- { // Collect all the included transactions block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) if block == nil { @@ -2330,8 +2342,8 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error // Because the reorg function does not handle new chain head, all hash // markers greater than or equal to new chain head should be deleted. number := commonBlock.Number - if len(newChain) > 0 { - number = newChain[0].Number + if len(newChain) > 1 { + number = newChain[1].Number } for i := number.Uint64() + 1; ; i++ { hash := rawdb.ReadCanonicalHash(bc.db, i)