diff --git a/beacon/engine/types.go b/beacon/engine/types.go index d1b3aa22abdf..d6781336e674 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -76,6 +76,7 @@ type ExecutableData struct { Withdrawals []*types.Withdrawal `json:"withdrawals"` BlobGasUsed *uint64 `json:"blobGasUsed"` ExcessBlobGas *uint64 `json:"excessBlobGas"` + Difficulty *big.Int `json:"difficulty"` } // JSON type overrides for executableData. @@ -238,7 +239,7 @@ func ExecutableDataToBlock(data ExecutableData, versionedHashes []common.Hash, b TxHash: types.DeriveSha(types.Transactions(txs), trie.NewStackTrie(nil)), ReceiptHash: data.ReceiptsRoot, Bloom: types.BytesToBloom(data.LogsBloom), - Difficulty: common.Big0, + Difficulty: data.Difficulty, Number: new(big.Int).SetUint64(data.Number), GasLimit: data.GasLimit, GasUsed: data.GasUsed, @@ -279,6 +280,7 @@ func BlockToExecutableData(block *types.Block, fees *big.Int, sidecars []*types. Withdrawals: block.Withdrawals(), BlobGasUsed: block.BlobGasUsed(), ExcessBlobGas: block.ExcessBlobGas(), + Difficulty: block.Difficulty(), } bundle := BlobsBundleV1{ Commitments: make([]hexutil.Bytes, 0), diff --git a/beacon/engine/types_qng.go b/beacon/engine/types_qng.go new file mode 100644 index 000000000000..cfea3e7b02d4 --- /dev/null +++ b/beacon/engine/types_qng.go @@ -0,0 +1,69 @@ +package engine + +import ( + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/trie" + "math/big" +) + +func ExecutableDataToBlockQng(params ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (*types.Block, error) { + txs, err := decodeTransactions(params.Transactions) + if err != nil { + return nil, err + } + if len(params.LogsBloom) != 256 { + return nil, fmt.Errorf("invalid logsBloom length: %v", len(params.LogsBloom)) + } + // Check that baseFeePerGas is not negative or too big + if params.BaseFeePerGas != nil && (params.BaseFeePerGas.Sign() == -1 || params.BaseFeePerGas.BitLen() > 256) { + return nil, fmt.Errorf("invalid baseFeePerGas: %v", params.BaseFeePerGas) + } + var blobHashes []common.Hash + for _, tx := range txs { + blobHashes = append(blobHashes, tx.BlobHashes()...) + } + if len(blobHashes) != len(versionedHashes) { + return nil, fmt.Errorf("invalid number of versionedHashes: %v blobHashes: %v", versionedHashes, blobHashes) + } + for i := 0; i < len(blobHashes); i++ { + if blobHashes[i] != versionedHashes[i] { + return nil, fmt.Errorf("invalid versionedHash at %v: %v blobHashes: %v", i, versionedHashes, blobHashes) + } + } + // Only set withdrawalsRoot if it is non-nil. This allows CLs to use + // ExecutableData before withdrawals are enabled by marshaling + // Withdrawals as the json null value. + var withdrawalsRoot *common.Hash + if params.Withdrawals != nil { + h := types.DeriveSha(types.Withdrawals(params.Withdrawals), trie.NewStackTrie(nil)) + withdrawalsRoot = &h + } + header := &types.Header{ + ParentHash: params.ParentHash, + UncleHash: types.EmptyUncleHash, + Coinbase: params.FeeRecipient, + Root: params.StateRoot, + TxHash: types.DeriveSha(types.Transactions(txs), trie.NewStackTrie(nil)), + ReceiptHash: params.ReceiptsRoot, + Bloom: types.BytesToBloom(params.LogsBloom), + Difficulty: params.Difficulty, + Number: new(big.Int).SetUint64(params.Number), + GasLimit: params.GasLimit, + GasUsed: params.GasUsed, + Time: params.Timestamp, + BaseFee: params.BaseFeePerGas, + Extra: params.ExtraData, + MixDigest: params.Random, + WithdrawalsHash: withdrawalsRoot, + ExcessBlobGas: params.ExcessBlobGas, + BlobGasUsed: params.BlobGasUsed, + ParentBeaconRoot: beaconRoot, + } + block := types.NewBlockWithHeader(header).WithBody(types.Body{Transactions: txs, Uncles: nil, Withdrawals: params.Withdrawals}) + if block.Hash() != params.BlockHash { + return nil, fmt.Errorf("blockhash mismatch, want %x, got %x", params.BlockHash, block.Hash()) + } + return block, nil +} diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b9fa3b8b4e5a..c7ac64f20767 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2172,7 +2172,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh if err != nil { Fatalf("%v", err) } - engine, err := ethconfig.CreateConsensusEngine(config, chainDb) + engine, err := ethconfig.CreateDefaultConsensusEngine(config, chainDb) if err != nil { Fatalf("%v", err) } diff --git a/core/blockchain.go b/core/blockchain.go index 05ebfd18b830..1b87b916f6f7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -712,11 +712,14 @@ func (bc *BlockChain) rewindPathHead(head *types.Header, root common.Hash) (*typ // noState represents if the target state requested for search // is unavailable and impossible to be recovered. - noState = !bc.HasState(root) && !bc.stateRecoverable(root) + noState = root == common.Hash{} start = time.Now() // Timestamp the rewinding is restarted logged = time.Now() // Timestamp last progress log was printed ) + if !noState { + noState = !bc.HasState(root) && !bc.stateRecoverable(root) + } // Rewind the head block tag until an available state is found. for { logger := log.Trace diff --git a/core/txpool/legacypool/legacypool_qng.go b/core/txpool/legacypool/legacypool_qng.go new file mode 100644 index 000000000000..041c4036d899 --- /dev/null +++ b/core/txpool/legacypool/legacypool_qng.go @@ -0,0 +1,20 @@ +package legacypool + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func (pool *LegacyPool) RemoveTx(hash common.Hash, outofbound bool) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.removeTx(hash, outofbound, true) +} + +func (pool *LegacyPool) All() *lookup { + return pool.all +} + +func (pool *LegacyPool) AddRemotesSync(txs []*types.Transaction) []error { + return pool.addRemotesSync(txs) +} diff --git a/core/txpool/txpool_qng.go b/core/txpool/txpool_qng.go new file mode 100644 index 000000000000..afc2aafebba4 --- /dev/null +++ b/core/txpool/txpool_qng.go @@ -0,0 +1,5 @@ +package txpool + +func (p *TxPool) Subpools() []SubPool { + return p.subpools +} diff --git a/core/types/transaction_qng.go b/core/types/transaction_qng.go new file mode 100644 index 000000000000..0f81ca08f3df --- /dev/null +++ b/core/types/transaction_qng.go @@ -0,0 +1,112 @@ +package types + +import ( + "errors" + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "math/big" +) + +type PKSigner interface { + GetPublicKey(tx *Transaction) ([]byte, error) +} + +func NewPKSigner(chainId *big.Int) PKSigner { + return cancunSigner{londonSigner{eip2930Signer{NewEIP155Signer(chainId)}}} +} + +func (s cancunSigner) GetPublicKey(tx *Transaction) ([]byte, error) { + if tx.Type() != BlobTxType { + return s.londonSigner.GetPublicKey(tx) + } + V, R, S := tx.RawSignatureValues() + // Blob txs are defined to use 0 and 1 as their recovery + // id, add 27 to become equivalent to unprotected Homestead signatures. + V = new(big.Int).Add(V, big.NewInt(27)) + if tx.ChainId().Cmp(s.chainId) != 0 { + return nil, fmt.Errorf("%w: have %d want %d", ErrInvalidChainId, tx.ChainId(), s.chainId) + } + return recoverPlainForPubK(s.Hash(tx), R, S, V, true) +} + +func (s londonSigner) GetPublicKey(tx *Transaction) ([]byte, error) { + if tx.Type() != DynamicFeeTxType { + return s.eip2930Signer.GetPublicKey(tx) + } + V, R, S := tx.RawSignatureValues() + // DynamicFee txs are defined to use 0 and 1 as their recovery + // id, add 27 to become equivalent to unprotected Homestead signatures. + V = new(big.Int).Add(V, big.NewInt(27)) + if tx.ChainId().Cmp(s.chainId) != 0 { + return nil, fmt.Errorf("%w: have %d want %d", ErrInvalidChainId, tx.ChainId(), s.chainId) + } + return recoverPlainForPubK(s.Hash(tx), R, S, V, true) +} + +func (s eip2930Signer) GetPublicKey(tx *Transaction) ([]byte, error) { + V, R, S := tx.RawSignatureValues() + switch tx.Type() { + case LegacyTxType: + return s.EIP155Signer.GetPublicKey(tx) + case AccessListTxType: + // AL txs are defined to use 0 and 1 as their recovery + // id, add 27 to become equivalent to unprotected Homestead signatures. + V = new(big.Int).Add(V, big.NewInt(27)) + default: + return nil, ErrTxTypeNotSupported + } + if tx.ChainId().Cmp(s.chainId) != 0 { + return nil, fmt.Errorf("%w: have %d want %d", ErrInvalidChainId, tx.ChainId(), s.chainId) + } + return recoverPlainForPubK(s.Hash(tx), R, S, V, true) +} + +func (s EIP155Signer) GetPublicKey(tx *Transaction) ([]byte, error) { + if tx.Type() != LegacyTxType { + return nil, ErrTxTypeNotSupported + } + if !tx.Protected() { + return HomesteadSigner{}.GetPublicKey(tx) + } + if tx.ChainId().Cmp(s.chainId) != 0 { + return nil, fmt.Errorf("%w: have %d want %d", ErrInvalidChainId, tx.ChainId(), s.chainId) + } + V, R, S := tx.RawSignatureValues() + V = new(big.Int).Sub(V, s.chainIdMul) + V.Sub(V, big8) + return recoverPlainForPubK(s.Hash(tx), R, S, V, true) +} + +func (hs HomesteadSigner) GetPublicKey(tx *Transaction) ([]byte, error) { + if tx.Type() != LegacyTxType { + return nil, ErrTxTypeNotSupported + } + v, r, s := tx.RawSignatureValues() + return recoverPlainForPubK(hs.Hash(tx), r, s, v, true) +} + +func recoverPlainForPubK(sighash common.Hash, R, S, Vb *big.Int, homestead bool) ([]byte, error) { + if Vb.BitLen() > 8 { + return nil, ErrInvalidSig + } + V := byte(Vb.Uint64() - 27) + if !crypto.ValidateSignatureValues(V, R, S, homestead) { + return nil, ErrInvalidSig + } + // encode the signature in uncompressed format + r, s := R.Bytes(), S.Bytes() + sig := make([]byte, crypto.SignatureLength) + copy(sig[32-len(r):32], r) + copy(sig[64-len(s):64], s) + sig[64] = V + // recover the public key from the signature + pub, err := crypto.Ecrecover(sighash[:], sig) + if err != nil { + return nil, err + } + if len(pub) == 0 || pub[0] != 4 { + return nil, errors.New("invalid public key") + } + return pub, nil +} diff --git a/core/types/transaction_qng_test.go b/core/types/transaction_qng_test.go new file mode 100644 index 000000000000..c465dcc05202 --- /dev/null +++ b/core/types/transaction_qng_test.go @@ -0,0 +1,42 @@ +package types + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/crypto" +) + +func TestGetPubkeyFromTx(t *testing.T) { + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + + signer := NewEIP155Signer(big.NewInt(18)) + tx, err := SignTx(NewTransaction(0, addr, new(big.Int), 0, new(big.Int), nil), signer, key) + if err != nil { + t.Fatal(err) + } + + from, err := Sender(signer, tx) + if err != nil { + t.Fatal(err) + } + if from != addr { + t.Errorf("expected from and address to be equal. Got %x want %x", from, addr) + } + + var gs PKSigner + gs = &signer + pkb, err := gs.GetPublicKey(tx) + if err != nil { + t.Fatal(err) + } + pk, err := crypto.UnmarshalPubkey(pkb) + if err != nil { + t.Fatal(err) + } + pka := crypto.PubkeyToAddress(*pk) + if pka != addr { + t.Errorf("expected pubkey-address and address to be equal. Got %x want %x", pka, addr) + } +} diff --git a/crypto/signature_cgo.go b/crypto/signature_cgo.go index 87289253c0ff..f359fc99ce52 100644 --- a/crypto/signature_cgo.go +++ b/crypto/signature_cgo.go @@ -24,8 +24,8 @@ import ( "errors" "fmt" + secp256k1 "github.com/Qitmeer/go-secp256k1" "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/crypto/secp256k1" ) // Ecrecover returns the uncompressed public key that created the given signature. diff --git a/eth/backend.go b/eth/backend.go index 8679018dabfc..0058d2f10b3d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -142,7 +142,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - engine, err := ethconfig.CreateConsensusEngine(chainConfig, chainDb) + if config.ConsensusEngine == nil { + config.ConsensusEngine = ethconfig.CreateDefaultConsensusEngine + } + engine, err := config.ConsensusEngine(chainConfig, chainDb) if err != nil { return nil, err } @@ -258,6 +261,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } eth.miner = miner.New(eth, config.Miner, eth.engine) + eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} @@ -359,7 +363,13 @@ func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } // Protocols returns all the currently configured // network protocols to start. func (s *Ethereum) Protocols() []p2p.Protocol { - protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) + var backend eth.Backend + if s.config.Genesis != nil && params.IsAmanaNetwork(s.config.Genesis.Config.ChainID) { + backend = (*qngHandler)(s.handler) + } else { + backend = (*ethHandler)(s.handler) + } + protos := eth.MakeProtocols(backend, s.networkID, s.ethDialCandidates) if s.config.SnapshotCache > 0 { protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) } diff --git a/eth/catalyst/api_qng.go b/eth/catalyst/api_qng.go new file mode 100644 index 000000000000..2fabafa99a82 --- /dev/null +++ b/eth/catalyst/api_qng.go @@ -0,0 +1,277 @@ +package catalyst + +import ( + "errors" + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/miner" + "strconv" + "time" +) + +func (api *ConsensusAPI) ForkchoiceUpdatedQng(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { + api.forkchoiceLock.Lock() + defer api.forkchoiceLock.Unlock() + + payloadVersion := engine.PayloadV1 + + log.Trace("Engine API request received", "method", "ForkchoiceUpdated", "head", update.HeadBlockHash, "finalized", update.FinalizedBlockHash, "safe", update.SafeBlockHash) + if update.HeadBlockHash == (common.Hash{}) { + log.Warn("Forkchoice requested update to zero hash") + return engine.STATUS_INVALID, nil // TODO(karalabe): Why does someone send us this? + } + // Stash away the last update to warn the user if the beacon client goes offline + api.lastForkchoiceLock.Lock() + api.lastForkchoiceUpdate = time.Now() + api.lastForkchoiceLock.Unlock() + + // Check whether we have the block yet in our database or not. If not, we'll + // need to either trigger a sync, or to reject this forkchoice update for a + // reason. + block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash) + if block == nil { + // If this block was previously invalidated, keep rejecting it here too + if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil { + return engine.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil + } + // If the head hash is unknown (was not given to us in a newPayload request), + // we cannot resolve the header, so not much to do. This could be extended in + // the future to resolve from the `eth` network, but it's an unexpected case + // that should be fixed, not papered over. + header := api.remoteBlocks.get(update.HeadBlockHash) + if header == nil { + log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash) + return engine.STATUS_SYNCING, nil + } + // If the finalized hash is known, we can direct the downloader to move + // potentially more data to the freezer from the get go. + finalized := api.remoteBlocks.get(update.FinalizedBlockHash) + + // Header advertised via a past newPayload request. Start syncing to it. + context := []interface{}{"number", header.Number, "hash", header.Hash()} + if update.FinalizedBlockHash != (common.Hash{}) { + if finalized == nil { + context = append(context, []interface{}{"finalized", "unknown"}...) + } else { + context = append(context, []interface{}{"finalized", finalized.Number}...) + } + } + log.Info("Forkchoice requested sync to new head", context...) + if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil { + return engine.STATUS_SYNCING, err + } + return engine.STATUS_SYNCING, nil + } + valid := func(id *engine.PayloadID) engine.ForkChoiceResponse { + return engine.ForkChoiceResponse{ + PayloadStatus: engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &update.HeadBlockHash}, + PayloadID: id, + } + } + if rawdb.ReadCanonicalHash(api.eth.ChainDb(), block.NumberU64()) != update.HeadBlockHash { + // Block is not canonical, set head. + if latestValid, err := api.eth.BlockChain().SetCanonical(block); err != nil { + return engine.ForkChoiceResponse{PayloadStatus: engine.PayloadStatusV1{Status: engine.INVALID, LatestValidHash: &latestValid}}, err + } + } else if api.eth.BlockChain().CurrentBlock().Hash() == update.HeadBlockHash { + // If the specified head matches with our local head, do nothing and keep + // generating the payload. It's a special corner case that a few slots are + // missing and we are requested to generate the payload in slot. + } else { + // If the head block is already in our canonical chain, the beacon client is + // probably resyncing. Ignore the update. + log.Info("Ignoring beacon update to old head", "number", block.NumberU64(), "hash", update.HeadBlockHash, "age", common.PrettyAge(time.Unix(int64(block.Time()), 0)), "have", api.eth.BlockChain().CurrentBlock().Number) + return valid(nil), nil + } + api.eth.SetSynced() + + // If the beacon client also advertised a finalized block, mark the local + // chain final and completely in PoS mode. + if update.FinalizedBlockHash != (common.Hash{}) { + // If the finalized block is not in our canonical tree, something is wrong + finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash) + if finalBlock == nil { + log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash) + return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("final block not available in database")) + } else if rawdb.ReadCanonicalHash(api.eth.ChainDb(), finalBlock.NumberU64()) != update.FinalizedBlockHash { + log.Warn("Final block not in canonical chain", "number", block.NumberU64(), "hash", update.HeadBlockHash) + return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("final block not in canonical chain")) + } + // Set the finalized block + api.eth.BlockChain().SetFinalized(finalBlock.Header()) + } + // Check if the safe block hash is in our canonical tree, if not something is wrong + if update.SafeBlockHash != (common.Hash{}) { + safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash) + if safeBlock == nil { + log.Warn("Safe block not available in database") + return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not available in database")) + } + if rawdb.ReadCanonicalHash(api.eth.ChainDb(), safeBlock.NumberU64()) != update.SafeBlockHash { + log.Warn("Safe block not in canonical chain") + return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not in canonical chain")) + } + // Set the safe block + api.eth.BlockChain().SetSafe(safeBlock.Header()) + } + // If payload generation was requested, create a new block to be potentially + // sealed by the beacon client. The payload will be requested later, and we + // will replace it arbitrarily many times in between. + if payloadAttributes != nil { + args := &miner.BuildPayloadArgs{ + Parent: update.HeadBlockHash, + Timestamp: payloadAttributes.Timestamp, + FeeRecipient: payloadAttributes.SuggestedFeeRecipient, + Random: payloadAttributes.Random, + Withdrawals: payloadAttributes.Withdrawals, + BeaconRoot: payloadAttributes.BeaconRoot, + Version: payloadVersion, + } + id := args.Id() + // If we already are busy generating this work, then we do not need + // to start a second process. + if api.localBlocks.has(id) { + return valid(&id), nil + } + payload, err := api.eth.Miner().BuildPayload(args) + if err != nil { + log.Error("Failed to build payload", "err", err) + return valid(nil), engine.InvalidPayloadAttributes.With(err) + } + api.localBlocks.put(id, payload) + return valid(&id), nil + } + return valid(nil), nil +} + +func (api *ConsensusAPI) GetPayloadQng(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) { + log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID) + data := api.localBlocks.get(payloadID, full) + if data == nil { + return nil, engine.UnknownPayload + } + return data, nil +} + +func (api *ConsensusAPI) NewPayloadQng(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) { + // The locking here is, strictly, not required. Without these locks, this can happen: + // + // 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to + // api.eth.BlockChain().InsertBlockWithoutSetHead, where it is blocked on + // e.g database compaction. + // 2. The call times out on the CL layer, which issues another NewPayload (execdata-N) call. + // Similarly, this also get stuck on the same place. Importantly, since the + // first call has not gone through, the early checks for "do we already have this block" + // will all return false. + // 3. When the db compaction ends, then N calls inserting the same payload are processed + // sequentially. + // Hence, we use a lock here, to be sure that the previous call has finished before we + // check whether we already have the block locally. + api.newPayloadLock.Lock() + defer api.newPayloadLock.Unlock() + + log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash) + block, err := engine.ExecutableDataToBlockQng(params, versionedHashes, beaconRoot) + if err != nil { + bgu := "nil" + if params.BlobGasUsed != nil { + bgu = strconv.Itoa(int(*params.BlobGasUsed)) + } + ebg := "nil" + if params.BlobGasUsed != nil { + ebg = strconv.Itoa(int(*params.ExcessBlobGas)) + } + log.Warn("Invalid NewPayload params", + "params.Number", params.Number, + "params.ParentHash", params.ParentHash, + "params.BlockHash", params.BlockHash, + "params.StateRoot", params.StateRoot, + "params.FeeRecipient", params.FeeRecipient, + "params.LogsBloom", common.PrettyBytes(params.LogsBloom), + "params.Random", params.Random, + "params.GasLimit", params.GasLimit, + "params.GasUsed", params.GasUsed, + "params.Timestamp", params.Timestamp, + "params.ExtraData", common.PrettyBytes(params.ExtraData), + "params.BaseFeePerGas", params.BaseFeePerGas, + "params.BlobGasUsed", bgu, + "params.ExcessBlobGas", ebg, + "len(params.Transactions)", len(params.Transactions), + "len(params.Withdrawals)", len(params.Withdrawals), + "beaconRoot", beaconRoot, + "error", err) + return api.invalid(err, nil), nil + } + // Stash away the last update to warn the user if the beacon client goes offline + api.lastNewPayloadLock.Lock() + api.lastNewPayloadUpdate = time.Now() + api.lastNewPayloadLock.Unlock() + + // If we already have the block locally, ignore the entire execution and just + // return a fake success. + if block := api.eth.BlockChain().GetBlockByHash(params.BlockHash); block != nil { + log.Warn("Ignoring already known beacon payload", "number", params.Number, "hash", params.BlockHash, "age", common.PrettyAge(time.Unix(int64(block.Time()), 0))) + hash := block.Hash() + return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil + } + // If this block was rejected previously, keep rejecting it + if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil { + return *res, nil + } + // If the parent is missing, we - in theory - could trigger a sync, but that + // would also entail a reorg. That is problematic if multiple sibling blocks + // are being fed to us, and even more so, if some semi-distant uncle shortens + // our live chain. As such, payload execution will not permit reorgs and thus + // will not trigger a sync cycle. That is fine though, if we get a fork choice + // update after legit payload executions. + parent := api.eth.BlockChain().GetBlock(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return api.delayPayloadImport(block), nil + } + if block.Time() <= parent.Time() { + log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time()) + return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil + } + // Another corner case: if the node is in snap sync mode, but the CL client + // tries to make it import a block. That should be denied as pushing something + // into the database directly will conflict with the assumptions of snap sync + // that it has an empty db that it can fill itself. + if api.eth.SyncMode() != downloader.FullSync { + return api.delayPayloadImport(block), nil + } + if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { + api.remoteBlocks.put(block.Hash(), block.Header()) + log.Warn("State not available, ignoring new payload") + return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil + } + log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number()) + if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil { + log.Warn("NewPayloadV1: inserting block failed", "error", err) + + api.invalidLock.Lock() + api.invalidBlocksHits[block.Hash()] = 1 + api.invalidTipsets[block.Hash()] = block.Header() + api.invalidLock.Unlock() + + return api.invalid(err, parent.Header()), nil + } + hash := block.Hash() + return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil +} + +func NewConsensusAPIQng(eth *eth.Ethereum) *ConsensusAPI { + api := &ConsensusAPI{ + eth: eth, + remoteBlocks: newHeaderQueue(), + localBlocks: newPayloadQueue(), + invalidBlocksHits: make(map[common.Hash]int), + invalidTipsets: make(map[common.Hash]*types.Header), + } + eth.Downloader().SetBadBlockCallback(api.setInvalidAncestor) + return api +} diff --git a/eth/downloader/downloader_qng.go b/eth/downloader/downloader_qng.go new file mode 100644 index 000000000000..8d8b5652eab7 --- /dev/null +++ b/eth/downloader/downloader_qng.go @@ -0,0 +1,83 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package downloader + +import ( + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" + "time" +) + +func (d *Downloader) SyncQng(peerid string, mode SyncMode, hash common.Hash) error { + d.peers.lock.RLock() + var peer *peerConnection + for _, peer = range d.peers.peers { + if peer.id == peerid { + break + } + } + d.peers.lock.RUnlock() + + if peer == nil { + return errBadPeer + } + log.Info("Attempting to retrieve sync target", "peer", peer.id, "head", hash.String()) + headers, metas, err := d.fetchQngHeadersByHash(peer, hash, 1, 0, false) + if err != nil || len(headers) != 1 { + log.Warn("Failed to fetch sync target", "headers", len(headers), "err", err) + return err + } + // Head header retrieved, if the hash matches, start the actual sync + if metas[0] != hash { + log.Error("Received invalid sync target", "want", hash, "have", metas[0]) + return err + } + if d.skeleton.filler.(*beaconBackfiller).filling { + return fmt.Errorf("backfiller is filling:%s", hash.String()) + } + return d.BeaconSync(mode, headers[0], headers[0]) +} + +func (d *Downloader) fetchQngHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) { + // Create the response sink and send the network request + start := time.Now() + resCh := make(chan *eth.Response) + + req, err := p.peer.RequestHeadersByHash(hash, amount, skip, reverse, resCh) + if err != nil { + return nil, nil, err + } + defer func() { + req.Close() + }() + + // Wait until the response arrives, the request is cancelled or times out + ttl := d.peers.rates.TargetTimeout() + + timeoutTimer := time.NewTimer(ttl) + defer timeoutTimer.Stop() + + select { + case <-timeoutTimer.C: + // Header retrieval timed out, update the metrics + p.log.Debug("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + + return nil, nil, errTimeout + + case res := <-resCh: + // Headers successfully retrieved, update the metrics + headerReqTimer.Update(time.Since(start)) + headerInMeter.Mark(int64(len(*res.Res.(*eth.BlockHeadersRequest)))) + + // Don't reject the packet even if it turns out to be bad, downloader will + // disconnect the peer on its own terms. Simply delivery the headers to + // be processed by the caller + res.Done <- nil + + return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil + } +} diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 7453fb1efdd3..7dceb9a5f497 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -67,8 +67,11 @@ var Defaults = Config{ RPCEVMTimeout: 5 * time.Second, GPO: FullNodeGPO, RPCTxFeeCap: 1, // 1 ether + ConsensusEngine: CreateDefaultConsensusEngine, } +type CreateConsensusEngine func(*params.ChainConfig, ethdb.Database) (consensus.Engine, error) + //go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go // Config contains configuration options for ETH and LES protocols. @@ -166,12 +169,14 @@ type Config struct { // OverrideVerkle (TODO: remove after the fork) OverrideVerkle *uint64 `toml:",omitempty"` + + ConsensusEngine CreateConsensusEngine } // CreateConsensusEngine creates a consensus engine for the given chain config. // Clique is allowed for now to live standalone, but ethash is forbidden and can // only exist on already merged networks. -func CreateConsensusEngine(config *params.ChainConfig, db ethdb.Database) (consensus.Engine, error) { +func CreateDefaultConsensusEngine(config *params.ChainConfig, db ethdb.Database) (consensus.Engine, error) { // Geth v1.14.0 dropped support for non-merged networks in any consensus // mode. If such a network is requested, reject startup. if !config.TerminalTotalDifficultyPassed { diff --git a/eth/fetcher/block_fetcher_qng.go b/eth/fetcher/block_fetcher_qng.go new file mode 100644 index 000000000000..f3d6fe01d5b2 --- /dev/null +++ b/eth/fetcher/block_fetcher_qng.go @@ -0,0 +1,921 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package fetcher + +import ( + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/trie" +) + +const ( + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction +) + +const ( + maxUncleDist = 7 // Maximum allowed backward distance from the chain head + maxQueueDist = 32 // Maximum allowed distance from the chain head to queue + hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced + blockLimit = 64 // Maximum number of unique blocks a peer may have delivered +) + +var ( + blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil) + blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil) + blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil) + blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil) + + blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil) + blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil) + blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil) + blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil) + + headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil) + bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil) + + headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil) + headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil) + bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil) + bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil) +) + +// HeaderRetrievalFn is a callback type for retrieving a header from the local chain. +type HeaderRetrievalFn func(common.Hash) *types.Header + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// headerRequesterFn is a callback type for sending a header retrieval request. +type headerRequesterFn func(common.Hash, chan *eth.Response) (*eth.Request, error) + +// bodyRequesterFn is a callback type for sending a body retrieval request. +type bodyRequesterFn func([]common.Hash, chan *eth.Response) (*eth.Request, error) + +// headerVerifierFn is a callback type to verify a block's header for fast propagation. +type headerVerifierFn func(header *types.Header) error + +// blockBroadcasterFn is a callback type for broadcasting a block to connected peers. +type blockBroadcasterFn func(block *types.Block, propagate bool) + +// chainHeightFn is a callback type to retrieve the current chain height. +type chainHeightFn func() uint64 + +// headersInsertFn is a callback type to insert a batch of headers into the local chain. +type headersInsertFn func(headers []*types.Header) (int, error) + +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. +type chainInsertFn func(types.Blocks) (int, error) + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) + +// blockAnnounce is the hash notification of the availability of a new block in the +// network. +type blockAnnounce struct { + hash common.Hash // Hash of the block being announced + number uint64 // Number of the block being announced (0 = unknown | old protocol) + header *types.Header // Header of the block partially reassembled (new protocol) + time time.Time // Timestamp of the announcement + + origin string // Identifier of the peer originating the notification + + fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block + fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block +} + +// headerFilterTask represents a batch of headers needing fetcher filtering. +type headerFilterTask struct { + peer string // The source peer of block headers + headers []*types.Header // Collection of headers to filter + time time.Time // Arrival time of the headers +} + +// bodyFilterTask represents a batch of block bodies (transactions and uncles) +// needing fetcher filtering. +type bodyFilterTask struct { + peer string // The source peer of block bodies + transactions [][]*types.Transaction // Collection of transactions per block bodies + uncles [][]*types.Header // Collection of uncles per block bodies + time time.Time // Arrival time of the blocks' contents +} + +// blockOrHeaderInject represents a schedules import operation. +type blockOrHeaderInject struct { + origin string + + header *types.Header // Used for light mode fetcher which only cares about header. + block *types.Block // Used for normal mode fetcher which imports full block. +} + +// number returns the block number of the injected object. +func (inject *blockOrHeaderInject) number() uint64 { + if inject.header != nil { + return inject.header.Number.Uint64() + } + return inject.block.NumberU64() +} + +// number returns the block hash of the injected object. +func (inject *blockOrHeaderInject) hash() common.Hash { + if inject.header != nil { + return inject.header.Hash() + } + return inject.block.Hash() +} + +// BlockFetcher is responsible for accumulating block announcements from various peers +// and scheduling them for retrieval. +type BlockFetcher struct { + light bool // The indicator whether it's a light fetcher or normal one. + + // Various event channels + notify chan *blockAnnounce + inject chan *blockOrHeaderInject + + headerFilter chan chan *headerFilterTask + bodyFilter chan chan *bodyFilterTask + + done chan common.Hash + quit chan struct{} + + // Announce states + announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion + announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching + fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching + fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval + completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing + + // Block cache + queue *prque.Prque[int64, *blockOrHeaderInject] // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + + // Callbacks + getHeader HeaderRetrievalFn // Retrieves a header from the local chain + getBlock blockRetrievalFn // Retrieves a block from the local chain + verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work + broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers + chainHeight chainHeightFn // Retrieves the current chain's height + insertHeaders headersInsertFn // Injects a batch of headers into the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Drops a peer for misbehaving + + // Testing hooks + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list + queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue + fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch + completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) + importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62) +} + +// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. +func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { + return &BlockFetcher{ + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New[int64, *blockOrHeaderInject](nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, + } +} + +// Start boots up the announcement based synchroniser, accepting and processing +// hash notifications and block fetches until termination requested. +func (f *BlockFetcher) Start() { + go f.loop() +} + +// Stop terminates the announcement based synchroniser, canceling all pending +// operations. +func (f *BlockFetcher) Stop() { + close(f.quit) +} + +// Notify announces the fetcher of the potential availability of a new block in +// the network. +func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, + headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { + block := &blockAnnounce{ + hash: hash, + number: number, + time: time, + origin: peer, + fetchHeader: headerFetcher, + fetchBodies: bodyFetcher, + } + select { + case f.notify <- block: + return nil + case <-f.quit: + return errTerminated + } +} + +// Enqueue tries to fill gaps the fetcher's future import queue. +func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error { + op := &blockOrHeaderInject{ + origin: peer, + block: block, + } + select { + case f.inject <- op: + return nil + case <-f.quit: + return errTerminated + } +} + +// FilterHeaders extracts all the headers that were explicitly requested by the fetcher, +// returning those that should be handled differently. +func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { + log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) + + // Send the filter channel to the fetcher + filter := make(chan *headerFilterTask) + + select { + case f.headerFilter <- filter: + case <-f.quit: + return nil + } + // Request the filtering of the header list + select { + case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: + case <-f.quit: + return nil + } + // Retrieve the headers remaining after filtering + select { + case task := <-filter: + return task.headers + case <-f.quit: + return nil + } +} + +// FilterBodies extracts all the block bodies that were explicitly requested by +// the fetcher, returning those that should be handled differently. +func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { + log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) + + // Send the filter channel to the fetcher + filter := make(chan *bodyFilterTask) + + select { + case f.bodyFilter <- filter: + case <-f.quit: + return nil, nil + } + // Request the filtering of the body list + select { + case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: + case <-f.quit: + return nil, nil + } + // Retrieve the bodies remaining after filtering + select { + case task := <-filter: + return task.transactions, task.uncles + case <-f.quit: + return nil, nil + } +} + +// Loop is the main fetcher loop, checking and processing various notification +// events. +func (f *BlockFetcher) loop() { + // Iterate the block fetching until a quit is requested + var ( + fetchTimer = time.NewTimer(0) + completeTimer = time.NewTimer(0) + ) + <-fetchTimer.C // clear out the channel + <-completeTimer.C + defer fetchTimer.Stop() + defer completeTimer.Stop() + + for { + // Clean up any expired block fetches + for hash, announce := range f.fetching { + if time.Since(announce.time) > fetchTimeout { + f.forgetHash(hash) + } + } + // Import any queued blocks that could potentially fit + height := f.chainHeight() + for !f.queue.Empty() { + op := f.queue.PopItem() + hash := op.hash() + if f.queueChangeHook != nil { + f.queueChangeHook(hash, false) + } + // If too high up the chain or phase, continue later + number := op.number() + if number > height+1 { + f.queue.Push(op, -int64(number)) + if f.queueChangeHook != nil { + f.queueChangeHook(hash, true) + } + break + } + // Otherwise if fresh and still unknown, try and import + if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + f.forgetBlock(hash) + continue + } + if f.light { + f.importHeaders(op.origin, op.header) + } else { + f.importBlocks(op.origin, op.block) + } + } + // Wait for an outside event to occur + select { + case <-f.quit: + // BlockFetcher terminating, abort all operations + return + + case notification := <-f.notify: + // A block was announced, make sure the peer isn't DOSing us + blockAnnounceInMeter.Mark(1) + + count := f.announces[notification.origin] + 1 + if count > hashLimit { + log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) + blockAnnounceDOSMeter.Mark(1) + break + } + if notification.number == 0 { + break + } + // If we have a valid block number, check that it's potentially useful + if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { + log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + blockAnnounceDropMeter.Mark(1) + break + } + // All is well, schedule the announce if block's not yet downloading + if _, ok := f.fetching[notification.hash]; ok { + break + } + if _, ok := f.completing[notification.hash]; ok { + break + } + f.announces[notification.origin] = count + f.announced[notification.hash] = append(f.announced[notification.hash], notification) + if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 { + f.announceChangeHook(notification.hash, true) + } + if len(f.announced) == 1 { + f.rescheduleFetch(fetchTimer) + } + + case op := <-f.inject: + // A direct block insertion was requested, try and fill any pending gaps + blockBroadcastInMeter.Mark(1) + + // Now only direct block injection is allowed, drop the header injection + // here silently if we receive. + if f.light { + continue + } + f.enqueue(op.origin, nil, op.block) + + case hash := <-f.done: + // A pending import finished, remove all traces of the notification + f.forgetHash(hash) + f.forgetBlock(hash) + + case <-fetchTimer.C: + // At least one block's timer ran out, check for needing retrieval + request := make(map[string][]common.Hash) + + for hash, announces := range f.announced { + // In current LES protocol(les2/les3), only header announce is + // available, no need to wait too much time for header broadcast. + timeout := arriveTimeout - gatherSlack + if f.light { + timeout = 0 + } + if time.Since(announces[0].time) > timeout { + // Pick a random peer to retrieve from, reset all others + announce := announces[rand.Intn(len(announces))] + f.forgetHash(hash) + + // If the block still didn't arrive, queue for fetching + if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) { + request[announce.origin] = append(request[announce.origin], hash) + f.fetching[hash] = announce + } + } + } + // Send out all block header requests + for peer, hashes := range request { + log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) + + // Create a closure of the fetch and schedule in on a new thread + fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes + go func(peer string) { + if f.fetchingHook != nil { + f.fetchingHook(hashes) + } + for _, hash := range hashes { + headerFetchMeter.Mark(1) + go func(hash common.Hash) { + resCh := make(chan *eth.Response) + + req, err := fetchHeader(hash, resCh) + if err != nil { + return // Legacy code, yolo + } + defer req.Close() + + timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer + defer timeout.Stop() + + select { + case res := <-resCh: + res.Done <- nil + f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersRequest), time.Now()) + + case <-timeout.C: + // The peer didn't respond in time. The request + // was already rescheduled at this point, we were + // waiting for a catchup. With an unresponsive + // peer however, it's a protocol violation. + f.dropPeer(peer) + } + }(hash) + } + }(peer) + } + // Schedule the next fetch if blocks are still pending + f.rescheduleFetch(fetchTimer) + + case <-completeTimer.C: + // At least one header's timer ran out, retrieve everything + request := make(map[string][]common.Hash) + + for hash, announces := range f.fetched { + // Pick a random peer to retrieve from, reset all others + announce := announces[rand.Intn(len(announces))] + f.forgetHash(hash) + + // If the block still didn't arrive, queue for completion + if f.getBlock(hash) == nil { + request[announce.origin] = append(request[announce.origin], hash) + f.completing[hash] = announce + } + } + // Send out all block body requests + for peer, hashes := range request { + log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes) + + // Create a closure of the fetch and schedule in on a new thread + if f.completingHook != nil { + f.completingHook(hashes) + } + fetchBodies := f.completing[hashes[0]].fetchBodies + bodyFetchMeter.Mark(int64(len(hashes))) + + go func(peer string, hashes []common.Hash) { + resCh := make(chan *eth.Response) + + req, err := fetchBodies(hashes, resCh) + if err != nil { + return // Legacy code, yolo + } + defer req.Close() + + timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer + defer timeout.Stop() + + select { + case res := <-resCh: + res.Done <- nil + // Ignoring withdrawals here, since the block fetcher is not used post-merge. + txs, uncles, _ := res.Res.(*eth.BlockBodiesResponse).Unpack() + f.FilterBodies(peer, txs, uncles, time.Now()) + + case <-timeout.C: + // The peer didn't respond in time. The request + // was already rescheduled at this point, we were + // waiting for a catchup. With an unresponsive + // peer however, it's a protocol violation. + f.dropPeer(peer) + } + }(peer, hashes) + } + // Schedule the next fetch if blocks are still pending + f.rescheduleComplete(completeTimer) + + case filter := <-f.headerFilter: + // Headers arrived from a remote peer. Extract those that were explicitly + // requested by the fetcher, and return everything else so it's delivered + // to other parts of the system. + var task *headerFilterTask + select { + case task = <-filter: + case <-f.quit: + return + } + headerFilterInMeter.Mark(int64(len(task.headers))) + + // Split the batch of headers into unknown ones (to return to the caller), + // known incomplete ones (requiring body retrievals) and completed blocks. + unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{} + for _, header := range task.headers { + hash := header.Hash() + + // Filter fetcher-requested headers from other synchronisation algorithms + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + // If the delivered header does not match the promised number, drop the announcer + if header.Number.Uint64() != announce.number { + log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) + f.dropPeer(announce.origin) + f.forgetHash(hash) + continue + } + // Collect all headers only if we are running in light + // mode and the headers are not imported by other means. + if f.light { + if f.getHeader(hash) == nil { + announce.header = header + lightHeaders = append(lightHeaders, announce) + } + f.forgetHash(hash) + continue + } + // Only keep if not imported by other means + if f.getBlock(hash) == nil { + announce.header = header + announce.time = task.time + + // If the block is empty (header only), short circuit into the final import queue + if header.TxHash == types.EmptyTxsHash && header.UncleHash == types.EmptyUncleHash { + log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) + + block := types.NewBlockWithHeader(header) + block.ReceivedAt = task.time + + complete = append(complete, block) + f.completing[hash] = announce + continue + } + // Otherwise add to the list of blocks needing completion + incomplete = append(incomplete, announce) + } else { + log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) + f.forgetHash(hash) + } + } else { + // BlockFetcher doesn't know about it, add to the return list + unknown = append(unknown, header) + } + } + headerFilterOutMeter.Mark(int64(len(unknown))) + select { + case filter <- &headerFilterTask{headers: unknown, time: task.time}: + case <-f.quit: + return + } + // Schedule the retrieved headers for body completion + for _, announce := range incomplete { + hash := announce.header.Hash() + if _, ok := f.completing[hash]; ok { + continue + } + f.fetched[hash] = append(f.fetched[hash], announce) + if len(f.fetched) == 1 { + f.rescheduleComplete(completeTimer) + } + } + // Schedule the header for light fetcher import + for _, announce := range lightHeaders { + f.enqueue(announce.origin, announce.header, nil) + } + // Schedule the header-only blocks for import + for _, block := range complete { + if announce := f.completing[block.Hash()]; announce != nil { + f.enqueue(announce.origin, nil, block) + } + } + + case filter := <-f.bodyFilter: + // Block bodies arrived, extract any explicitly requested blocks, return the rest + var task *bodyFilterTask + select { + case task = <-filter: + case <-f.quit: + return + } + bodyFilterInMeter.Mark(int64(len(task.transactions))) + blocks := []*types.Block{} + // abort early if there's nothing explicitly requested + if len(f.completing) > 0 { + for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { + // Match up a body to any possible completion request + var ( + matched = false + uncleHash common.Hash // calculated lazily and reused + txnHash common.Hash // calculated lazily and reused + ) + for hash, announce := range f.completing { + if f.queued[hash] != nil || announce.origin != task.peer { + continue + } + if uncleHash == (common.Hash{}) { + uncleHash = types.CalcUncleHash(task.uncles[i]) + } + if uncleHash != announce.header.UncleHash { + continue + } + if txnHash == (common.Hash{}) { + txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil)) + } + if txnHash != announce.header.TxHash { + continue + } + // Mark the body matched, reassemble if still unknown + matched = true + if f.getBlock(hash) == nil { + block := types.NewBlockWithHeader(announce.header).WithBody(types.Body{Transactions: task.transactions[i], Uncles: task.uncles[i]}) + block.ReceivedAt = task.time + blocks = append(blocks, block) + } else { + f.forgetHash(hash) + } + } + if matched { + task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) + task.uncles = append(task.uncles[:i], task.uncles[i+1:]...) + i-- + continue + } + } + } + bodyFilterOutMeter.Mark(int64(len(task.transactions))) + select { + case filter <- task: + case <-f.quit: + return + } + // Schedule the retrieved blocks for ordered import + for _, block := range blocks { + if announce := f.completing[block.Hash()]; announce != nil { + f.enqueue(announce.origin, nil, block) + } + } + } + } +} + +// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout. +func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) { + // Short circuit if no blocks are announced + if len(f.announced) == 0 { + return + } + // Schedule announcement retrieval quickly for light mode + // since server won't send any headers to client. + if f.light { + fetch.Reset(lightTimeout) + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.announced { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + fetch.Reset(arriveTimeout - time.Since(earliest)) +} + +// rescheduleComplete resets the specified completion timer to the next fetch timeout. +func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) { + // Short circuit if no headers are fetched + if len(f.fetched) == 0 { + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.fetched { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + complete.Reset(gatherSlack - time.Since(earliest)) +} + +// enqueue schedules a new header or block import operation, if the component +// to be imported has not yet been seen. +func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) { + var ( + hash common.Hash + number uint64 + ) + if header != nil { + hash, number = header.Hash(), header.Number.Uint64() + } else { + hash, number = block.Hash(), block.NumberU64() + } + // Ensure the peer isn't DOSing us + count := f.queues[peer] + 1 + if count > blockLimit { + log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit) + blockBroadcastDOSMeter.Mark(1) + f.forgetHash(hash) + return + } + // Discard any past or too distant blocks + if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { + log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist) + blockBroadcastDropMeter.Mark(1) + f.forgetHash(hash) + return + } + // Schedule the block for future importing + if _, ok := f.queued[hash]; !ok { + op := &blockOrHeaderInject{origin: peer} + if header != nil { + op.header = header + } else { + op.block = block + } + f.queues[peer] = count + f.queued[hash] = op + f.queue.Push(op, -int64(number)) + if f.queueChangeHook != nil { + f.queueChangeHook(hash, true) + } + log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size()) + } +} + +// importHeaders spawns a new goroutine to run a header insertion into the chain. +// If the header's number is at the same height as the current import phase, it +// updates the phase states accordingly. +func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { + hash := header.Hash() + log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) + + go func() { + defer func() { f.done <- hash }() + // If the parent's unknown, abort insertion + parent := f.getHeader(header.ParentHash) + if parent == nil { + log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + return + } + // Validate the header and if something went wrong, drop the peer + if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { + log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) + f.dropPeer(peer) + return + } + // Run the actual import and log any issues + if _, err := f.insertHeaders([]*types.Header{header}); err != nil { + log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) + return + } + // Invoke the testing hook if needed + if f.importedHook != nil { + f.importedHook(header, nil) + } + }() +} + +// importBlocks spawns a new goroutine to run a block insertion into the chain. If the +// block's number is at the same height as the current import phase, it updates +// the phase states accordingly. +func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { + hash := block.Hash() + + // Run the import on a new thread + log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) + go func() { + defer func() { f.done <- hash }() + + // If the parent's unknown, abort insertion + parent := f.getBlock(block.ParentHash()) + if parent == nil { + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + return + } + // Quickly validate the header and propagate the block if it passes + switch err := f.verifyHeader(block.Header()); err { + case nil: + // All ok, quickly propagate to our peers + blockBroadcastOutTimer.UpdateSince(block.ReceivedAt) + go f.broadcastBlock(block, true) + + case consensus.ErrFutureBlock: + // Weird future block, don't fail, but neither propagate + + default: + // Something went very wrong, drop the peer + log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) + f.dropPeer(peer) + return + } + // Run the actual import and log any issues + if _, err := f.insertChain(types.Blocks{block}); err != nil { + log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) + return + } + // If import succeeded, broadcast the block + blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) + go f.broadcastBlock(block, false) + + // Invoke the testing hook if needed + if f.importedHook != nil { + f.importedHook(nil, block) + } + }() +} + +// forgetHash removes all traces of a block announcement from the fetcher's +// internal state. +func (f *BlockFetcher) forgetHash(hash common.Hash) { + // Remove all pending announces and decrement DOS counters + if announceMap, ok := f.announced[hash]; ok { + for _, announce := range announceMap { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + } + delete(f.announced, hash) + if f.announceChangeHook != nil { + f.announceChangeHook(hash, false) + } + } + // Remove any pending fetches and decrement the DOS counters + if announce := f.fetching[hash]; announce != nil { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + delete(f.fetching, hash) + } + + // Remove any pending completion requests and decrement the DOS counters + for _, announce := range f.fetched[hash] { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + } + delete(f.fetched, hash) + + // Remove any pending completions and decrement the DOS counters + if announce := f.completing[hash]; announce != nil { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + delete(f.completing, hash) + } +} + +// forgetBlock removes all traces of a queued block from the fetcher's internal +// state. +func (f *BlockFetcher) forgetBlock(hash common.Hash) { + if insert := f.queued[hash]; insert != nil { + f.queues[insert.origin]-- + if f.queues[insert.origin] == 0 { + delete(f.queues, insert.origin) + } + delete(f.queued, hash) + } +} diff --git a/eth/handler.go b/eth/handler.go index d5117584c001..2092481cbae6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -125,6 +125,11 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + // qng + blockFetcher *fetcher.BlockFetcher + minedBlockSub *event.TypeMuxSubscription + chainSync *chainSyncer } // newHandler returns a handler for all Ethereum chain management protocol. @@ -193,6 +198,30 @@ func newHandler(config *handlerConfig) (*handler, error) { return h.txpool.Add(txs, false, false) } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) + + // qng + // Construct the fetcher (short sync) + validator := func(header *types.Header) error { + return h.chain.Engine().VerifyHeader(h.chain, header) + } + heighter := func() uint64 { + return h.chain.CurrentBlock().Number.Uint64() + } + inserter := func(blocks types.Blocks) (int, error) { + // If snap sync is running, deny importing weird blocks. This is a problematic + // clause when starting up a new network, because snap-syncing miners might not + // accept each others' blocks until a restart. Unfortunately we haven't figured + // out a way yet where nodes can decide unilaterally whether the network is new + // or not. This should be fixed if we figure out a solution. + if !h.synced.Load() { + log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + return h.chain.InsertChain(blocks) + } + h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) + + h.chainSync = newChainSyncer(h) return h, nil } @@ -434,9 +463,23 @@ func (h *handler) Start(maxPeers int) { // start peer handler tracker h.wg.Add(1) go h.protoTracker() + + // qng + // broadcast mined blocks + h.wg.Add(1) + h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go h.minedBroadcastLoop() + + // start sync handlers + h.wg.Add(1) + go h.chainSync.loop() } func (h *handler) Stop() { + // qng ----------------------- + h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + // --------------------------- + h.txsSub.Unsubscribe() // quits txBroadcastLoop h.txFetcher.Stop() h.downloader.Terminate() diff --git a/eth/handler_qng.go b/eth/handler_qng.go new file mode 100644 index 000000000000..4357e52e6e02 --- /dev/null +++ b/eth/handler_qng.go @@ -0,0 +1,286 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "errors" + "fmt" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/p2p/enode" + "math" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" +) + +// runQngPeer registers an eth peer into the joint qng/snap peerset, adds it to +// various subsystems and starts handling messages. +func (h *handler) runQngPeer(peer *eth.Peer, handler eth.Handler) error { + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() + + // If the peer has a `snap` extension, wait for it to connect so we can have + // a uniform initialization/teardown mechanism + snap, err := h.peers.waitSnapExtension(peer) + if err != nil { + peer.Log().Error("Snapshot extension barrier failed", "err", err) + return err + } + + // Execute the Ethereum handshake + var ( + genesis = h.chain.Genesis() + head = h.chain.CurrentHeader() + hash = head.Hash() + number = head.Number.Uint64() + td = h.chain.GetTd(hash, number) + ) + forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time) + if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil { + peer.Log().Debug("Ethereum handshake failed", "err", err) + return err + } + reject := false // reserved peer slots + if h.snapSync.Load() { + if snap == nil { + // If we are running snap-sync, we want to reserve roughly half the peer + // slots for peers supporting the snap protocol. + // The logic here is; we only allow up to 5 more non-snap peers than snap-peers. + if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 { + reject = true + } + } + } + // Ignore maxPeers if this is a trusted peer + if !peer.Peer.Info().Network.Trusted { + if reject || h.peers.len() >= h.maxPeers { + return p2p.DiscTooManyPeers + } + } + peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) + + // Register the peer locally + if err := h.peers.registerPeer(peer, snap); err != nil { + peer.Log().Error("Ethereum peer registration failed", "err", err) + return err + } + defer h.unregisterPeer(peer.ID()) + + p := h.peers.peer(peer.ID()) + if p == nil { + return errors.New("peer dropped during handling") + } + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { + peer.Log().Error("Failed to register peer in eth syncer", "err", err) + return err + } + if snap != nil { + if err := h.downloader.SnapSyncer.Register(snap); err != nil { + peer.Log().Error("Failed to register peer in snap syncer", "err", err) + return err + } + } + h.chainSync.handlePeerEvent() + + // Propagate existing transactions. new transactions appearing + // after this will be sent via broadcasts. + h.syncTransactions(peer) + + // Create a notification channel for pending requests if the peer goes down + dead := make(chan struct{}) + defer close(dead) + + // If we have any explicit peer required block hashes, request them + for number, hash := range h.requiredBlocks { + resCh := make(chan *eth.Response) + + req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh) + if err != nil { + return err + } + go func(number uint64, hash common.Hash, req *eth.Request) { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + + timeout := time.NewTimer(syncChallengeTimeout) + defer timeout.Stop() + + select { + case res := <-resCh: + headers := ([]*types.Header)(*res.Res.(*eth.BlockHeadersRequest)) + if len(headers) == 0 { + // Required blocks are allowed to be missing if the remote + // node is not yet synced + res.Done <- nil + return + } + // Validate the header and either drop the peer or continue + if len(headers) > 1 { + res.Done <- errors.New("too many headers in required block response") + return + } + if headers[0].Number.Uint64() != number || headers[0].Hash() != hash { + peer.Log().Info("Required block mismatch, dropping peer", "number", number, "hash", headers[0].Hash(), "want", hash) + res.Done <- errors.New("required block mismatch") + return + } + peer.Log().Debug("Peer required block verified", "number", number, "hash", hash) + res.Done <- nil + case <-timeout.C: + peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) + h.removePeer(peer.ID()) + } + }(number, hash, req) + } + // Handle incoming messages until the connection is torn down + return handler(peer) +} + +// minedBroadcastLoop sends mined blocks to connected peers. +func (h *handler) minedBroadcastLoop() { + defer h.wg.Done() + + for obj := range h.minedBlockSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + h.BroadcastBlock(ev.Block, true) // First propagate block to peers + h.BroadcastBlock(ev.Block, false) // Only then announce to the rest + } + } +} + +// BroadcastBlock will either propagate a block to a subset of its peers, or +// will only announce its availability (depending what's requested). +func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { + hash := block.Hash() + peers := h.peers.peersWithoutBlock(hash) + + // If propagation is requested, send to a subset of the peer + if propagate { + // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) + var td *big.Int + if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { + td = new(big.Int).Add(block.Difficulty(), h.chain.GetTd(block.ParentHash(), block.NumberU64()-1)) + } else { + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) + return + } + // Send the block to a subset of our peers + transfer := peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range transfer { + peer.AsyncSendNewBlock(block, td) + } + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return + } + // Otherwise if the block is indeed in out own chain, announce it + if h.chain.HasBlock(hash, block.NumberU64()) { + for _, peer := range peers { + peer.AsyncSendNewBlockHash(block) + } + log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + } +} + +type qngHandler handler + +func (h *qngHandler) Chain() *core.BlockChain { return h.chain } +func (h *qngHandler) TxPool() eth.TxPool { return h.txpool } + +// RunPeer is invoked when a peer joins on the `eth` protocol. +func (h *qngHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { + return (*handler)(h).runQngPeer(peer, hand) +} + +// PeerInfo retrieves all known `eth` information about a peer. +func (h *qngHandler) PeerInfo(id enode.ID) interface{} { + if p := h.peers.peer(id.String()); p != nil { + return p.info() + } + return nil +} + +// AcceptTxs retrieves whether transaction processing is enabled on the node +// or if inbound transactions should simply be dropped. +func (h *qngHandler) AcceptTxs() bool { + return h.synced.Load() +} + +// Handle is invoked from a peer's message handler when it receives a new remote +// message that the handler couldn't consume and serve itself. +func (h *qngHandler) Handle(peer *eth.Peer, packet eth.Packet) error { + // Consume any broadcasts and announces, forwarding the rest to the downloader + switch packet := packet.(type) { + case *eth.NewBlockHashesPacket: + hashes, numbers := packet.Unpack() + return h.handleBlockAnnounces(peer, hashes, numbers) + + case *eth.NewBlockPacket: + return h.handleBlockBroadcast(peer, packet.Block, packet.TD) + + case *eth.NewPooledTransactionHashesPacket: + return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes) + + case *eth.TransactionsPacket: + for _, tx := range *packet { + if tx.Type() == types.BlobTxType { + return errors.New("disallowed broadcast blob transaction") + } + } + return h.txFetcher.Enqueue(peer.ID(), *packet, false) + + case *eth.PooledTransactionsResponse: + return h.txFetcher.Enqueue(peer.ID(), *packet, true) + + default: + return fmt.Errorf("unexpected eth packet type: %T", packet) + } +} + +// handleBlockAnnounces is invoked from a peer's message handler when it transmits a +// batch of block announcements for the local node to process. +func (h *qngHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error { + // Schedule all the unknown hashes for retrieval + var ( + unknownHashes = make([]common.Hash, 0, len(hashes)) + unknownNumbers = make([]uint64, 0, len(numbers)) + ) + for i := 0; i < len(hashes); i++ { + if !h.chain.HasBlock(hashes[i], numbers[i]) { + unknownHashes = append(unknownHashes, hashes[i]) + unknownNumbers = append(unknownNumbers, numbers[i]) + } + } + for i := 0; i < len(unknownHashes); i++ { + h.blockFetcher.Notify(peer.ID(), unknownHashes[i], unknownNumbers[i], time.Now(), peer.RequestOneHeader, peer.RequestBodies) + } + return nil +} + +// handleBlockBroadcast is invoked from a peer's message handler when it transmits a +// block broadcast for the local node to process. +func (h *qngHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { + // Schedule the block for import + h.blockFetcher.Enqueue(peer.ID(), block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = block.ParentHash() + trueTD = new(big.Int).Sub(td, block.Difficulty()) + ) + // Update the peer's total difficulty if better than the previous + if _, td := peer.Head(); trueTD.Cmp(td) > 0 { + peer.SetHead(trueHead, trueTD) + h.chainSync.handlePeerEvent() + } + return nil +} diff --git a/eth/peerset_qng.go b/eth/peerset_qng.go new file mode 100644 index 000000000000..2cbdb10a3bbd --- /dev/null +++ b/eth/peerset_qng.go @@ -0,0 +1,42 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "math/big" +) + +// peersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes so it might be propagated to them. +func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.KnownBlock(hash) { + list = append(list, p) + } + } + return list +} + +// peerWithHighestTD retrieves the known peer with the currently highest total +// difficulty, but below the given PoS switchover threshold. +func (ps *peerSet) peerWithHighestTD() *eth.Peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var ( + bestPeer *eth.Peer + bestTd *big.Int + ) + for _, p := range ps.peers { + if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { + bestPeer, bestTd = p.Peer, td + } + } + return bestPeer +} diff --git a/eth/protocols/eth/broadcast_qng.go b/eth/protocols/eth/broadcast_qng.go new file mode 100644 index 000000000000..243c675cc6f3 --- /dev/null +++ b/eth/protocols/eth/broadcast_qng.go @@ -0,0 +1,41 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// blockPropagation is a block propagation event, waiting for its turn in the +// broadcast queue. +type blockPropagation struct { + block *types.Block + td *big.Int +} + +// broadcastBlocks is a write loop that multiplexes blocks and block announcements +// to the remote peer. The goal is to have an async writer that does not lock up +// node internals and at the same time rate limits queued data. +func (p *Peer) broadcastBlocks() { + for { + select { + case prop := <-p.queuedBlocks: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case block := <-p.queuedBlockAnns: + if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { + return + } + p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + + case <-p.term: + return + } + } +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index bdc630a9f467..085f0f67eb21 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -18,7 +18,6 @@ package eth import ( "encoding/json" - "errors" "fmt" "github.com/ethereum/go-ethereum/common" @@ -274,14 +273,6 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) [ return receipts } -func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { - return errors.New("block announcements disallowed") // We dropped support for non-merge networks -} - -func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { - return errors.New("block broadcasts disallowed") // We dropped support for non-merge networks -} - func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { // A batch of headers arrived to one of our previous requests res := new(BlockHeadersPacket) diff --git a/eth/protocols/eth/handlers_qng.go b/eth/protocols/eth/handlers_qng.go new file mode 100644 index 000000000000..df8f866dcade --- /dev/null +++ b/eth/protocols/eth/handlers_qng.go @@ -0,0 +1,50 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "fmt" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" +) + +func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { + // A batch of new block announcements just arrived + ann := new(NewBlockHashesPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range *ann { + peer.markBlock(block.Hash) + } + // Deliver them all to the backend for queuing + return backend.Handle(peer, ann) +} + +func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { + // Retrieve and decode the propagated block + ann := new(NewBlockPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if err := ann.sanityCheck(); err != nil { + return err + } + if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { + log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() { + log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + ann.Block.ReceivedAt = msg.Time() + ann.Block.ReceivedFrom = peer + + // Mark the peer as owning the block + peer.markBlock(ann.Block.Hash()) + + return backend.Handle(peer, ann) +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index f53782a05318..d65e5c6735ce 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -64,6 +64,11 @@ type Peer struct { term chan struct{} // Termination channel to stop the broadcasters lock sync.RWMutex // Mutex protecting the internal fields + + // qng + knownBlocks *knownCache // Set of block hashes known to be known by this peer + queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer + queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer } // NewPeer creates a wrapper for a network connection and negotiated protocol @@ -88,6 +93,12 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe go peer.announceTransactions() go peer.dispatcher() + // qng + peer.knownBlocks = newKnownCache(maxKnownBlocks) + peer.queuedBlocks = make(chan *blockPropagation, maxQueuedBlocks) + peer.queuedBlockAnns = make(chan *types.Block, maxQueuedBlockAnns) + go peer.broadcastBlocks() + return peer } diff --git a/eth/protocols/eth/peer_qng.go b/eth/protocols/eth/peer_qng.go new file mode 100644 index 000000000000..30d7c2ce552e --- /dev/null +++ b/eth/protocols/eth/peer_qng.go @@ -0,0 +1,95 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p" + "math/big" +) + +const ( + // maxKnownBlocks is the maximum block hashes to keep in the known list + // before starting to randomly evict them. + maxKnownBlocks = 1024 + + // maxQueuedBlocks is the maximum number of block propagations to queue up before + // dropping broadcasts. There's not much point in queueing stale blocks, so a few + // that might cover uncles should be enough. + maxQueuedBlocks = 4 + + // maxQueuedBlockAnns is the maximum number of block announcements to queue up before + // dropping broadcasts. Similarly to block propagations, there's no point to queue + // above some healthy uncle limit, so use that. + maxQueuedBlockAnns = 4 +) + +// max is a helper function which returns the larger of the two given integers. +func max(a, b int) int { + if a > b { + return a + } + return b +} + +// KnownBlock returns whether peer is known to already have a block. +func (p *Peer) KnownBlock(hash common.Hash) bool { + return p.knownBlocks.Contains(hash) +} + +// markBlock marks a block as known for the peer, ensuring that the block will +// never be propagated to this particular peer. +func (p *Peer) markBlock(hash common.Hash) { + // If we reached the memory allowance, drop a previously known block hash + p.knownBlocks.Add(hash) +} + +// SendNewBlockHashes announces the availability of a number of blocks through +// a hash notification. +func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { + // Mark all the block hashes as known, but ensure we don't overflow our limits + p.knownBlocks.Add(hashes...) + + request := make(NewBlockHashesPacket, len(hashes)) + for i := 0; i < len(hashes); i++ { + request[i].Hash = hashes[i] + request[i].Number = numbers[i] + } + return p2p.Send(p.rw, NewBlockHashesMsg, request) +} + +// AsyncSendNewBlockHash queues the availability of a block for propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *Peer) AsyncSendNewBlockHash(block *types.Block) { + select { + case p.queuedBlockAnns <- block: + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) + } +} + +// SendNewBlock propagates an entire block to a remote peer. +func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ + Block: block, + TD: td, + }) +} + +// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { + select { + case p.queuedBlocks <- &blockPropagation{block: block, td: td}: + // Mark all the block hash as known, but ensure we don't overflow our limits + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) + } +} diff --git a/eth/protocols/eth/protocol_qng.go b/eth/protocols/eth/protocol_qng.go new file mode 100644 index 000000000000..dee5727d4369 --- /dev/null +++ b/eth/protocols/eth/protocol_qng.go @@ -0,0 +1,11 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +// sanityCheck verifies that the values are reasonable, as a DoS protection +func (request *NewBlockPacket) sanityCheck() error { + if err := request.Block.SanityCheck(); err != nil { + return err + } + return nil +} diff --git a/eth/sync_qng.go b/eth/sync_qng.go new file mode 100644 index 000000000000..b1f40b3d0d89 --- /dev/null +++ b/eth/sync_qng.go @@ -0,0 +1,203 @@ +// Copyright (c) 2017-2024 The qitmeer developers + +package eth + +import ( + "github.com/ethereum/go-ethereum/params" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" +) + +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + defaultMinSyncPeers = 5 // Amount of peers desired to start syncing +) + +// chainSyncer coordinates blockchain sync components. +type chainSyncer struct { + handler *handler + force *time.Timer + forced bool // true when force timer fired + warned time.Time + peerEventCh chan struct{} + doneCh chan error // non-nil when sync is running +} + +// chainSyncOp is a scheduled sync operation. +type chainSyncOp struct { + mode downloader.SyncMode + peer *eth.Peer + td *big.Int + head common.Hash +} + +// newChainSyncer creates a chainSyncer. +func newChainSyncer(handler *handler) *chainSyncer { + return &chainSyncer{ + handler: handler, + peerEventCh: make(chan struct{}), + } +} + +// handlePeerEvent notifies the syncer about a change in the peer set. +// This is called for new peers and every time a peer announces a new +// chain head. +func (cs *chainSyncer) handlePeerEvent() bool { + if !params.IsAmanaNetwork(cs.handler.chain.Config().ChainID) { + return false + } + select { + case cs.peerEventCh <- struct{}{}: + return true + case <-cs.handler.quitSync: + return false + } +} + +// loop runs in its own goroutine and launches the sync when necessary. +func (cs *chainSyncer) loop() { + defer cs.handler.wg.Done() + + if !params.IsAmanaNetwork(cs.handler.chain.Config().ChainID) { + return + } + cs.handler.blockFetcher.Start() + defer cs.handler.blockFetcher.Stop() + + // The force timer lowers the peer count threshold down to one when it fires. + // This ensures we'll always start sync even if there aren't enough peers. + cs.force = time.NewTimer(forceSyncCycle) + defer cs.force.Stop() + + for { + if op := cs.nextSyncOp(); op != nil { + cs.startSync(op) + } + select { + case <-cs.peerEventCh: + // Peer information changed, recheck. + case err := <-cs.doneCh: + cs.doneCh = nil + cs.force.Reset(forceSyncCycle) + cs.forced = false + + if err != nil && time.Since(cs.warned) > 10*time.Second { + log.Warn(err.Error()) + cs.warned = time.Now() + } + case <-cs.force.C: + cs.forced = true + + case <-cs.handler.quitSync: + // Disable all insertion on the blockchain. This needs to happen before + // terminating the downloader because the downloader waits for blockchain + // inserts, and these can take a long time to finish. + cs.handler.chain.StopInsert() + if cs.doneCh != nil { + <-cs.doneCh + } + return + } + } +} + +// nextSyncOp determines whether sync is required at this time. +func (cs *chainSyncer) nextSyncOp() *chainSyncOp { + if cs.doneCh != nil { + return nil // Sync already running + } + // Ensure we're at minimum peer count. + minPeers := defaultMinSyncPeers + if cs.forced { + minPeers = 1 + } else if minPeers > cs.handler.maxPeers { + minPeers = cs.handler.maxPeers + } + if cs.handler.peers.len() < minPeers { + return nil + } + // We have enough peers, pick the one with the highest TD, but avoid going + // over the terminal total difficulty. Above that we expect the consensus + // clients to direct the chain head to sync to. + peer := cs.handler.peers.peerWithHighestTD() + if peer == nil { + return nil + } + mode, ourTD := cs.modeAndLocalHead() + op := peerToSyncOp(mode, peer) + if op.td.Cmp(ourTD) <= 0 { + return nil // We're in sync + } + return op +} + +func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp { + peerHead, peerTD := p.Head() + return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead} +} + +func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { + // If we're in snap sync mode, return that directly + if cs.handler.snapSync.Load() { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + return downloader.SnapSync, td + } + // We are probably in full sync, but we might have rewound to before the + // snap sync pivot, check if we should re-enable snap sync. + head := cs.handler.chain.CurrentBlock() + if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil { + if head.Number.Uint64() < *pivot { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + return downloader.SnapSync, td + } + } + // We are in a full sync, but the associated head state is missing. To complete + // the head state, forcefully rerun the snap sync. Note it doesn't mean the + // persistent state is corrupted, just mismatch with the head block. + if !cs.handler.chain.HasState(head.Root) { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + log.Info("Reenabled snap sync as chain is stateless") + return downloader.SnapSync, td + } + // Nope, we're really full syncing + td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64()) + return downloader.FullSync, td +} + +// startSync launches doSync in a new goroutine. +func (cs *chainSyncer) startSync(op *chainSyncOp) { + cs.doneCh = make(chan error, 1) + go func() { cs.doneCh <- cs.handler.doSync(op) }() +} + +// doSync synchronizes the local blockchain with a remote peer. +func (h *handler) doSync(op *chainSyncOp) error { + err := h.downloader.SyncQng(op.peer.ID(), op.mode, op.head) + if err != nil { + return err + } + h.enableSyncedFeatures() + + head := h.chain.CurrentBlock() + if head.Number.Uint64() > 0 { + // We've completed a sync cycle, notify all peers of new state. This path is + // essential in star-topology networks where a gateway node needs to notify + // all its out-of-date peers of the availability of a new block. This failure + // scenario will most often crop up in private and hackathon networks with + // degenerate connectivity, but it should be healthy for the mainnet too to + // more reliably update peers or the local TD state. + if block := h.chain.GetBlock(head.Hash(), head.Number.Uint64()); block != nil { + h.BroadcastBlock(block, false) + } + } + return nil +} diff --git a/expose.go b/expose.go new file mode 100644 index 000000000000..d86ef4d4c87b --- /dev/null +++ b/expose.go @@ -0,0 +1,16 @@ +// Copyright (c) 2017-2022 The qitmeer developers + +package ethereum + +import ( + "github.com/ethereum/go-ethereum/internal/flags" + "github.com/ethereum/go-ethereum/internal/web3ext" + "github.com/urfave/cli/v2" + "math/big" +) + +var Modules = web3ext.Modules + +func GlobalBig(ctx *cli.Context, name string) *big.Int { + return flags.GlobalBig(ctx, name) +} diff --git a/go.mod b/go.mod index 763210decd0b..3f264727f997 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,11 @@ module github.com/ethereum/go-ethereum -go 1.21 +go 1.22.3 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 github.com/Microsoft/go-winio v0.6.2 + github.com/Qitmeer/go-secp256k1 v0.5.1 github.com/VictoriaMetrics/fastcache v1.12.2 github.com/aws/aws-sdk-go-v2 v1.21.2 github.com/aws/aws-sdk-go-v2/config v1.18.45 diff --git a/go.sum b/go.sum index 562362dee6fd..51542f177ae0 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/Qitmeer/go-secp256k1 v0.5.1 h1:t3u4VMSz7LRn9Ki1lecG7tyTNwbLGILt1CwdOAA5N7M= +github.com/Qitmeer/go-secp256k1 v0.5.1/go.mod h1:AkrSQDAMwdV4+pXUQnEAPvPbUiIN4DAHMAm0ueneJB8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/miner/miner_qng.go b/miner/miner_qng.go new file mode 100644 index 000000000000..e3843a1f93c7 --- /dev/null +++ b/miner/miner_qng.go @@ -0,0 +1,29 @@ +package miner + +import ( + "github.com/ethereum/go-ethereum/core/types" +) + +func (payload *Payload) ResolveFullBlock() *types.Block { + payload.lock.Lock() + defer payload.lock.Unlock() + + if payload.full == nil { + select { + case <-payload.stop: + return nil + default: + } + // Wait the full payload construction. Note it might block + // forever if Resolve is called in the meantime which + // terminates the background construction process. + payload.cond.Wait() + } + // Terminate the background payload construction + select { + case <-payload.stop: + default: + close(payload.stop) + } + return payload.full +} diff --git a/params/config.go b/params/config.go index 871782399d14..7be48e99047e 100644 --- a/params/config.go +++ b/params/config.go @@ -400,6 +400,18 @@ func (c *ChainConfig) Description() string { } banner += fmt.Sprintf("Chain ID: %v (%s)\n", c.ChainID, network) switch { + case IsQngNetwork(c.ChainID): + banner += "Consensus: MeerDAG (proof-of-work)\n" + return QngEIPsBanner(banner, c) + case IsAmanaNetwork(c.ChainID): + banner += "Consensus: Amana (proof-of-authority)\n" + return QngEIPsBanner(banner, c) + case IsFlanaNetwork(c.ChainID): + banner += "Consensus: Flana (rollup)\n" + return QngEIPsBanner(banner, c) + case IsMizanaNetwork(c.ChainID): + banner += "Consensus: Mizana (ZK rollup)\n" + return QngEIPsBanner(banner, c) case c.Ethash != nil: if c.TerminalTotalDifficulty == nil { banner += "Consensus: Ethash (proof-of-work)\n" diff --git a/params/config_qng.go b/params/config_qng.go new file mode 100644 index 000000000000..99ded40db7eb --- /dev/null +++ b/params/config_qng.go @@ -0,0 +1,187 @@ +package params + +import ( + "fmt" + "math/big" +) + +type MeerChainConfig struct { + ChainID *big.Int // chainId identifies the current chain and is used for replay protection +} + +var ( + QngMainnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(813), + } + QngTestnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8131), + } + QngMixnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8132), + } + QngPrivnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8133), + } + + AmanaChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8134), + } + AmanaTestnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81341), + } + AmanaMixnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81342), + } + AmanaPrivnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81343), + } + + FlanaChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8135), + } + FlanaTestnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81351), + } + FlanaMixnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81352), + } + FlanaPrivnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81353), + } + + MizanaChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(8136), + } + MizanaTestnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81361), + } + MizanaMixnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81362), + } + MizanaPrivnetChainConfig = &MeerChainConfig{ + ChainID: big.NewInt(81363), + } +) + +func init() { + NetworkNames[QngMainnetChainConfig.ChainID.String()] = "qng" + NetworkNames[QngTestnetChainConfig.ChainID.String()] = "qng-test" + NetworkNames[QngMixnetChainConfig.ChainID.String()] = "qng-mix" + NetworkNames[QngPrivnetChainConfig.ChainID.String()] = "qng-priv" + + NetworkNames[AmanaChainConfig.ChainID.String()] = "amana" + NetworkNames[AmanaTestnetChainConfig.ChainID.String()] = "amana-test" + NetworkNames[AmanaMixnetChainConfig.ChainID.String()] = "amana-mix" + NetworkNames[AmanaPrivnetChainConfig.ChainID.String()] = "amana-priv" + + NetworkNames[FlanaChainConfig.ChainID.String()] = "flana" + NetworkNames[FlanaTestnetChainConfig.ChainID.String()] = "flana-test" + NetworkNames[FlanaMixnetChainConfig.ChainID.String()] = "flana-mix" + NetworkNames[FlanaPrivnetChainConfig.ChainID.String()] = "flana-priv" + + NetworkNames[MizanaChainConfig.ChainID.String()] = "mizana" + NetworkNames[MizanaTestnetChainConfig.ChainID.String()] = "mizana-test" + NetworkNames[MizanaMixnetChainConfig.ChainID.String()] = "mizana-mix" + NetworkNames[MizanaPrivnetChainConfig.ChainID.String()] = "mizana-priv" +} + +func IsQngNetwork(chainID *big.Int) bool { + if chainID.Cmp(QngMainnetChainConfig.ChainID) == 0 || + chainID.Cmp(QngTestnetChainConfig.ChainID) == 0 || + chainID.Cmp(QngMixnetChainConfig.ChainID) == 0 || + chainID.Cmp(QngPrivnetChainConfig.ChainID) == 0 { + return true + } + return false +} + +func IsAmanaNetwork(chainID *big.Int) bool { + if chainID.Cmp(AmanaChainConfig.ChainID) == 0 || + chainID.Cmp(AmanaTestnetChainConfig.ChainID) == 0 || + chainID.Cmp(AmanaMixnetChainConfig.ChainID) == 0 || + chainID.Cmp(AmanaPrivnetChainConfig.ChainID) == 0 { + return true + } + return false +} + +func IsFlanaNetwork(chainID *big.Int) bool { + if chainID.Cmp(FlanaChainConfig.ChainID) == 0 || + chainID.Cmp(FlanaTestnetChainConfig.ChainID) == 0 || + chainID.Cmp(FlanaMixnetChainConfig.ChainID) == 0 || + chainID.Cmp(FlanaPrivnetChainConfig.ChainID) == 0 { + return true + } + return false +} + +func IsMizanaNetwork(chainID *big.Int) bool { + if chainID.Cmp(MizanaChainConfig.ChainID) == 0 || + chainID.Cmp(MizanaTestnetChainConfig.ChainID) == 0 || + chainID.Cmp(MizanaMixnetChainConfig.ChainID) == 0 || + chainID.Cmp(MizanaPrivnetChainConfig.ChainID) == 0 { + return true + } + return false +} + +func QngEIPsBanner(banner string, c *ChainConfig) string { + banner += "\n" + + // Create a list of forks with a short description of them. Forks that only + // makes sense for mainnet should be optional at printing to avoid bloating + // the output for testnets and private networks. + banner += "Pre-Merge hard forks (block based):\n" + banner += fmt.Sprintf(" - Homestead: #%-8v (EIP-606 Requires:EIP-2, EIP-7, EIP-8)\n", c.HomesteadBlock) + if c.DAOForkBlock != nil { + banner += fmt.Sprintf(" - DAO Fork: #%-8v (EIP-779 Requires:EIP-606)\n", c.DAOForkBlock) + } + banner += fmt.Sprintf(" - Tangerine Whistle (EIP 150): #%-8v (EIP-608 Requires:EIP-150, EIP-779)\n", c.EIP150Block) + banner += fmt.Sprintf(" - Spurious Dragon/1 (EIP 155): #%-8v (EIP-607 Requires:EIP-155, EIP-160, EIP-161, EIP-170, EIP-608)\n", c.EIP155Block) + banner += fmt.Sprintf(" - Spurious Dragon/2 (EIP 158): #%-8v (EIP-607 Requires:EIP-155, EIP-160, EIP-161, EIP-170, EIP-608)\n", c.EIP155Block) + banner += fmt.Sprintf(" - Byzantium: #%-8v (EIP-609 Requires:EIP-100, EIP-140, EIP-196, EIP-197, EIP-198, EIP-211, EIP-214, EIP-607, EIP-649, EIP-658)\n", c.ByzantiumBlock) + banner += fmt.Sprintf(" - Constantinople: #%-8v (EIP-1013 Requires:EIP-145, EIP-609, EIP-1014, EIP-1052, EIP-1234, EIP-1283)\n", c.ConstantinopleBlock) + banner += fmt.Sprintf(" - Petersburg: #%-8v (EIP-1716 Requires:EIP-1013, EIP-1283)\n", c.PetersburgBlock) + banner += fmt.Sprintf(" - Istanbul: #%-8v (EIP-1679 Requires:EIP-152, EIP-1108, EIP-1344, EIP-1716, EIP-1884, EIP-2028, EIP-2200)\n", c.IstanbulBlock) + if c.MuirGlacierBlock != nil { + banner += fmt.Sprintf(" - Muir Glacier: #%-8v (EIP-2387 Requires:EIP-1679, EIP-2384)\n", c.MuirGlacierBlock) + } + banner += fmt.Sprintf(" - Berlin: #%-8v (EIP-2565 EIP-2929 EIP-2718 EIP-2930 Requires:EIP-198)\n", c.BerlinBlock) + banner += fmt.Sprintf(" - London: #%-8v (EIP-1559 EIP-3198 EIP-3529 EIP-3541 EIP-3554 Requires:EIP-2718, EIP-2930, EIP-2200, EIP-2929, EIP-2930)\n", c.LondonBlock) + if c.ArrowGlacierBlock != nil { + banner += fmt.Sprintf(" - Arrow Glacier: #%-8v (EIP-4345)\n", c.ArrowGlacierBlock) + } + if c.GrayGlacierBlock != nil { + banner += fmt.Sprintf(" - Gray Glacier: #%-8v (EIP-5133)\n", c.GrayGlacierBlock) + } + banner += "\n" + + // Add a special section for the merge as it's non-obvious + if c.TerminalTotalDifficulty == nil { + banner += "The Merge is not yet available for this network!\n" + banner += " - Hard-fork specification: EIP-3675 EIP-4399 Requires:EIP-2124\n" + } else { + banner += "Merge configured:\n" + banner += " - Hard-fork specification: EIP-3675 EIP-4399 Requires:EIP-2124\n" + banner += fmt.Sprintf(" - Network known to be merged: %v\n", c.TerminalTotalDifficultyPassed) + banner += fmt.Sprintf(" - Total terminal difficulty: %v\n", c.TerminalTotalDifficulty) + if c.MergeNetsplitBlock != nil { + banner += fmt.Sprintf(" - Merge netsplit block: #%-8v\n", c.MergeNetsplitBlock) + } + } + banner += "\n" + + // Create a list of forks post-merge + banner += "Post-Merge hard forks (timestamp based):\n" + if c.ShanghaiTime != nil { + banner += fmt.Sprintf(" - Shanghai: @%-10v (EIP-3651 EIP-3855 EIP-3860 EIP-4895 EIP-6049 Requires:EIP-2929, EIP-170)\n", *c.ShanghaiTime) + } + if c.CancunTime != nil { + banner += fmt.Sprintf(" - Cancun: @%-10v\n", *c.CancunTime) + } + if c.PragueTime != nil { + banner += fmt.Sprintf(" - Prague: @%-10v\n", *c.PragueTime) + } + + return banner +} diff --git a/tests/fuzzers/secp256k1/secp_test.go b/tests/fuzzers/secp256k1/secp_test.go index ca3039764b42..374a992a34dc 100644 --- a/tests/fuzzers/secp256k1/secp_test.go +++ b/tests/fuzzers/secp256k1/secp_test.go @@ -20,8 +20,8 @@ import ( "fmt" "testing" + secp256k1 "github.com/Qitmeer/go-secp256k1" "github.com/btcsuite/btcd/btcec/v2" - "github.com/ethereum/go-ethereum/crypto/secp256k1" ) func TestFuzzer(t *testing.T) {