Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: do verify and commit concurrently #92

Merged
merged 22 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions common/gopool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package gopool

import (
"runtime"
"time"

"github.com/panjf2000/ants/v2"
)

var (
// Init a instance pool when importing ants.
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
minNumberPerTask = 5
)

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultPool.Submit(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultPool.Running()
}

// Cap returns the capacity of this default pool.
func Cap() int {
return defaultPool.Cap()
}

// Free returns the available goroutines to work.
func Free() int {
return defaultPool.Free()
}

// Release Closes the default pool.
func Release() {
defaultPool.Release()
}

// Reboot reboots the default pool.
func Reboot() {
defaultPool.Reboot()
}

func Threads(tasks int) int {
threads := tasks / minNumberPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}
return threads
}
186 changes: 127 additions & 59 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -65,57 +66,97 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash {
return fmt.Errorf("uncle root hash mismatch (header value %x, calculated %x)", header.UncleHash, hash)
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}

// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return errors.New("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to Shanghai fork
return errors.New("withdrawals present in block body")
}

// Blob transactions may be present after the Cancun fork.
var blobs int
for i, tx := range block.Transactions() {
// Count the number of blobs to validate against the header's blobGasUsed
blobs += len(tx.BlobHashes())
validateFuns := []func() error{
func() error {
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}
return nil
},
func() error {
// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return errors.New("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to Shanghai fork
return errors.New("withdrawals present in block body")
}
return nil
},
func() error {
// Blob transactions may be present after the Cancun fork.
var blobs int
for i, tx := range block.Transactions() {
// Count the number of blobs to validate against the header's blobGasUsed
blobs += len(tx.BlobHashes())

// If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block.
if tx.BlobTxSidecar() != nil {
return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i)
}
// If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block.
if tx.BlobTxSidecar() != nil {
return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i)
}

// The individual checks for blob validity (version-check + not empty)
// happens in StateTransition.
}
// The individual checks for blob validity (version-check + not empty)
// happens in StateTransition.
}

// Check blob gas usage.
if header.BlobGasUsed != nil {
if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated
return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob)
}
} else {
if blobs > 0 {
return errors.New("data blobs present in block body")
// Check blob gas usage.
if header.BlobGasUsed != nil {
if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated
return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob)
}
} else {
if blobs > 0 {
return errors.New("data blobs present in block body")
}
}
return nil
},
func() error {
// Ancestor block must be known.
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
gopool.Submit(func() {
validateRes <- tmpFunc()
})
}
errs := make([]error, 0, len(validateFuns))
for i := 0; i < len(validateFuns); i++ {
err := <-validateRes
errs = append(errs, err)
}
var ancestorErr error
for _, err := range errs {
if err != nil {
if !errors.Is(err, consensus.ErrUnknownAncestor) && !errors.Is(err, consensus.ErrPrunedAncestor) {
// Other errors are returned first.
return err
} else {
ancestorErr = err
}
}
}

// Ancestor block must be known.
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
// If there are no other errors, but an ancestorErr, return it.
if ancestorErr != nil {
return ancestorErr
}

return nil
}

Expand All @@ -126,23 +167,50 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
}
// Validate the received block's bloom with the one derived from the generated receipts.
// For valid blocks this should always validate to true.
rbloom := types.CreateBloom(receipts)
if rbloom != header.Bloom {
return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
}
// Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]]))
receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
if receiptSha != header.ReceiptHash {
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)

validateFuns := []func() error{
func() error {
// Validate the received block's bloom with the one derived from the generated receipts.
// For valid blocks this should always validate to true.
rbloom := types.CreateBloom(receipts)
if rbloom != header.Bloom {
return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
}
return nil
},
func() error {
// Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]]))
receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
if receiptSha != header.ReceiptHash {
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
}
return nil
},
func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
gopool.Submit(func() {
validateRes <- tmpFunc()
})
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())

var err error
for i := 0; i < len(validateFuns); i++ {
r := <-validateRes
if r != nil && err == nil {
err = r
}
}
return nil
return err
}

// CalcGasLimit computes the gas limit of the next block after parent. It aims
Expand Down
39 changes: 26 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ var (
snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
trieCommitTimer = metrics.NewRegisteredTimer("chain/trie/commits", nil)
codeCommitTimer = metrics.NewRegisteredTimer("chain/code/commits", nil)
blockCommitTimer = metrics.NewRegisteredTimer("chain/block/commits", nil)

triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)

Expand Down Expand Up @@ -1482,20 +1485,29 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
//
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
start := time.Now()
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
blockWriteExternalTimer.UpdateSince(start)
log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())
wg := sync.WaitGroup{}
wg.Add(1)
defer wg.Wait()
go func() {
if metrics.EnabledExpensive {
defer func(start time.Time) { blockCommitTimer.Update(time.Since(start)) }(time.Now())
}
defer wg.Done()
start := time.Now()
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
blockWriteExternalTimer.UpdateSince(start)
log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())
}()

// Commit all cached state changes into underlying memory database.
start = time.Now()
start := time.Now()
state.SetExpectedStateRoot(block.Root())
root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
Expand Down Expand Up @@ -1905,7 +1917,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
Expand Down Expand Up @@ -1979,6 +1990,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
trieCommitTimer.Update(statedb.TrieCommits)
codeCommitTimer.Update(statedb.CodeCommits)

blockWriteTimer.UpdateSince(wstart)
blockInsertTimer.UpdateSince(start)
Expand Down
1 change: 1 addition & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
statedb.SetExpectedStateRoot(block.Root())
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
Expand Down
Loading
Loading