diff --git a/accounts/keystore/account_cache.go b/accounts/keystore/account_cache.go index a3ec6e9c56..9276b3cfca 100644 --- a/accounts/keystore/account_cache.go +++ b/accounts/keystore/account_cache.go @@ -27,7 +27,7 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -79,7 +79,7 @@ func newAccountCache(keydir string) (*accountCache, chan struct{}) { keydir: keydir, byAddr: make(map[common.Address][]accounts.Account), notify: make(chan struct{}, 1), - fileC: fileCache{all: mapset.NewThreadUnsafeSet()}, + fileC: fileCache{all: mapset.NewThreadUnsafeSet[string]()}, } ac.watcher = newWatcher(ac) return ac, ac.notify @@ -275,16 +275,15 @@ func (ac *accountCache) scanAccounts() error { // Process all the file diffs start := time.Now() - for _, p := range creates.ToSlice() { - if a := readAccount(p.(string)); a != nil { + for _, path := range creates.ToSlice() { + if a := readAccount(path); a != nil { ac.add(*a) } } - for _, p := range deletes.ToSlice() { - ac.deleteByFile(p.(string)) + for _, path := range deletes.ToSlice() { + ac.deleteByFile(path) } - for _, p := range updates.ToSlice() { - path := p.(string) + for _, path := range updates.ToSlice() { ac.deleteByFile(path) if a := readAccount(path); a != nil { ac.add(*a) diff --git a/accounts/keystore/file_cache.go b/accounts/keystore/file_cache.go index 55a63df991..c603b3bd3d 100644 --- a/accounts/keystore/file_cache.go +++ b/accounts/keystore/file_cache.go @@ -23,20 +23,20 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/log" ) // fileCache is a cache of files seen during scan of keystore. type fileCache struct { - all mapset.Set // Set of all files from the keystore folder - lastMod time.Time // Last time instance when a file was modified + all mapset.Set[string] // Set of all files from the keystore folder + lastMod time.Time // Last time instance when a file was modified mu sync.Mutex } // scan performs a new scan on the given directory, compares against the already // cached filenames, and returns file sets: creates, deletes, updates. -func (fc *fileCache) scan(keyDir string) (mapset.Set, mapset.Set, mapset.Set, error) { +func (fc *fileCache) scan(keyDir string) (mapset.Set[string], mapset.Set[string], mapset.Set[string], error) { t0 := time.Now() // List all the failes from the keystore folder @@ -50,8 +50,8 @@ func (fc *fileCache) scan(keyDir string) (mapset.Set, mapset.Set, mapset.Set, er defer fc.mu.Unlock() // Iterate all the files and gather their metadata - all := mapset.NewThreadUnsafeSet() - mods := mapset.NewThreadUnsafeSet() + all := mapset.NewThreadUnsafeSet[string]() + mods := mapset.NewThreadUnsafeSet[string]() var newLastMod time.Time for _, f := range files { diff --git a/cmd/clef/main.go b/cmd/clef/main.go index 52845ad2ca..a7abec6c14 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -656,6 +656,7 @@ func signer(c *cli.Context) error { cors := utils.SplitAndTrim(c.GlobalString(utils.HTTPCORSDomainFlag.Name)) srv := rpc.NewServer() + srv.SetBatchLimits(node.DefaultConfig.BatchRequestLimit, node.DefaultConfig.BatchResponseMaxSize) err := node.RegisterApis(rpcAPI, []string{"account"}, srv, false) if err != nil { utils.Fatalf("Could not register API: %w", err) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 468fbf35f6..c3536f14e3 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -198,6 +198,8 @@ var ( utils.RPCGlobalEVMTimeoutFlag, utils.RPCGlobalTxFeeCapFlag, utils.AllowUnprotectedTxs, + utils.BatchRequestLimit, + utils.BatchResponseMaxSize, } metricsFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d4b702b68f..c8409ab400 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -697,6 +697,16 @@ var ( Name: "rpc.allow-unprotected-txs", Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC", } + BatchRequestLimit = &cli.IntFlag{ + Name: "rpc.batch-request-limit", + Usage: "Maximum number of requests in a batch", + Value: node.DefaultConfig.BatchRequestLimit, + } + BatchResponseMaxSize = &cli.IntFlag{ + Name: "rpc.batch-response-max-size", + Usage: "Maximum number of bytes returned from a batched call", + Value: node.DefaultConfig.BatchResponseMaxSize, + } // Network Settings MaxPeersFlag = cli.IntFlag{ @@ -1060,6 +1070,12 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(AllowUnprotectedTxs.Name) { cfg.AllowUnprotectedTxs = ctx.GlobalBool(AllowUnprotectedTxs.Name) } + if ctx.IsSet(BatchRequestLimit.Name) { + cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name) + } + if ctx.IsSet(BatchResponseMaxSize.Name) { + cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name) + } } // setGraphQL creates the GraphQL listener interface string from the set diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 12a69c127a..62d79bec6f 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -24,7 +24,7 @@ import ( "runtime" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/math" @@ -210,7 +210,7 @@ func (ethash *Ethash) VerifyUncles(chain consensus.ChainReader, block *types.Blo return nil } // Gather the set of past uncles and ancestors - uncles, ancestors := mapset.NewSet(), make(map[common.Hash]*types.Header) + uncles, ancestors := mapset.NewSet[common.Hash](), make(map[common.Hash]*types.Header) number, parent := block.NumberU64()-1, block.ParentHash() for i := 0; i < 7; i++ { diff --git a/core/beacon/errors.go b/core/beacon/errors.go index 5b95c38a23..77ad43fe38 100644 --- a/core/beacon/errors.go +++ b/core/beacon/errors.go @@ -16,14 +16,12 @@ package beacon -import "github.com/ethereum/go-ethereum/rpc" - var ( VALID = GenericStringResponse{"VALID"} SUCCESS = GenericStringResponse{"SUCCESS"} INVALID = ForkChoiceResponse{Status: "INVALID", PayloadID: nil} SYNCING = ForkChoiceResponse{Status: "SYNCING", PayloadID: nil} - GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"} - UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"} - InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"} + GenericServerError = CustomError{Code: -32000, ValidationError: "Server error"} + UnknownPayload = CustomError{Code: -32001, ValidationError: "Unknown payload"} + InvalidTB = CustomError{Code: -32002, ValidationError: "Invalid terminal block"} ) diff --git a/core/beacon/types.go b/core/beacon/types.go index e79d3f5b6d..86a077f2f3 100644 --- a/core/beacon/types.go +++ b/core/beacon/types.go @@ -84,6 +84,15 @@ type GenericStringResponse struct { Status string `json:"status"` } +type CustomError struct { + Code int + ValidationError string +} + +func (e *CustomError) ErrorCode() int { return e.Code } + +func (e *CustomError) Error() string { return e.ValidationError } + type ExecutePayloadResponse struct { Status string `json:"status"` LatestValidHash common.Hash `json:"latestValidHash"` diff --git a/eth/api_backend.go b/eth/api_backend.go index 8c228fc73d..3184240bf9 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -297,7 +297,7 @@ func (b *EthAPIBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) return b.gpo.SuggestTipCap(ctx) } -func (b *EthAPIBackend) FeeHistory(ctx context.Context, blockCount int, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) { +func (b *EthAPIBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) { return b.gpo.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 307df5f1e5..fc100fb4a2 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -24,7 +24,7 @@ import ( "sort" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" @@ -148,7 +148,7 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} - underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch) + underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch) // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. @@ -202,7 +202,7 @@ func NewTxFetcherForTests( fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet(), + underpriced: mapset.NewSet[common.Hash](), hasTx: hasTx, addTxs: addTxs, fetchTxs: fetchTxs, diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 02229a7549..822bc826f6 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -56,7 +56,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { // from, to block number var test1 FilterCriteria - vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock) + vector := fmt.Sprintf(`{"fromBlock":"%v","toBlock":"%v"}`, fromBlock, toBlock) if err := json.Unmarshal([]byte(vector), &test1); err != nil { t.Fatal(err) } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 61a7153173..b684ebb091 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -151,7 +151,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { var ( head = header.Number.Uint64() end = uint64(f.end) - pending = f.end == rpc.PendingBlockNumber.Int64() + pending = f.end == rpc.LatestBlockNumber.Int64() ) if f.begin == rpc.LatestBlockNumber.Int64() { f.begin = int64(head) diff --git a/eth/gasprice/feehistory.go b/eth/gasprice/feehistory.go index 78bade5ae8..47b87e8c7a 100644 --- a/eth/gasprice/feehistory.go +++ b/eth/gasprice/feehistory.go @@ -137,7 +137,7 @@ func (oracle *Oracle) processBlock(bf *blockFees, percentiles []float64) { // also returned if requested and available. // Note: an error is only returned if retrieving the head header has failed. If there are no // retrievable blocks in the specified range then zero block count is returned with no error. -func (oracle *Oracle) resolveBlockRange(ctx context.Context, lastBlock rpc.BlockNumber, blocks int) (*types.Block, []*types.Receipt, uint64, int, error) { +func (oracle *Oracle) resolveBlockRange(ctx context.Context, lastBlock rpc.BlockNumber, blocks uint64) (*types.Block, []*types.Receipt, uint64, uint64, error) { var ( headBlock rpc.BlockNumber pendingBlock *types.Block @@ -172,7 +172,7 @@ func (oracle *Oracle) resolveBlockRange(ctx context.Context, lastBlock rpc.Block } // ensure not trying to retrieve before genesis if rpc.BlockNumber(blocks) > lastBlock+1 { - blocks = int(lastBlock + 1) + blocks = uint64(lastBlock + 1) } return pendingBlock, pendingReceipts, uint64(lastBlock), blocks, nil } @@ -191,7 +191,7 @@ func (oracle *Oracle) resolveBlockRange(ctx context.Context, lastBlock rpc.Block // // Note: baseFee includes the next block after the newest of the returned range, because this // value can be derived from the newest block. -func (oracle *Oracle) FeeHistory(ctx context.Context, blocks int, unresolvedLastBlock rpc.BlockNumber, rewardPercentiles []float64) (*big.Int, [][]*big.Int, []*big.Int, []float64, error) { +func (oracle *Oracle) FeeHistory(ctx context.Context, blocks uint64, unresolvedLastBlock rpc.BlockNumber, rewardPercentiles []float64) (*big.Int, [][]*big.Int, []*big.Int, []float64, error) { if blocks < 1 { return common.Big0, nil, nil, nil, nil // returning with no data and no error means there are no retrievable blocks } @@ -230,7 +230,7 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks int, unresolvedLast for i, p := range rewardPercentiles { binary.LittleEndian.PutUint64(percentileKey[i*8:(i+1)*8], math.Float64bits(p)) } - for i := 0; i < maxBlockFetchers && i < blocks; i++ { + for i := uint64(0); i < maxBlockFetchers && i < blocks; i++ { go func() { for { // Retrieve the next block number to fetch with this goroutine @@ -288,7 +288,7 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks int, unresolvedLast if fees.err != nil { return common.Big0, nil, nil, nil, fees.err } - i := int(fees.blockNumber - oldestBlock) + i := fees.blockNumber - oldestBlock if fees.results.baseFee != nil { reward[i], baseFee[i], baseFee[i+1], gasUsedRatio[i] = fees.results.reward, fees.results.baseFee, fees.results.nextBaseFee, fees.results.gasUsedRatio } else { diff --git a/eth/gasprice/feehistory_test.go b/eth/gasprice/feehistory_test.go index c259eb0acf..6095180b86 100644 --- a/eth/gasprice/feehistory_test.go +++ b/eth/gasprice/feehistory_test.go @@ -28,8 +28,8 @@ import ( func TestFeeHistory(t *testing.T) { var cases = []struct { pending bool - maxHeader, maxBlock int - count int + maxHeader, maxBlock uint64 + count uint64 last rpc.BlockNumber percent []float64 expFirst uint64 diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go index 0af8a70ea8..12739ccac3 100644 --- a/eth/gasprice/gasprice.go +++ b/eth/gasprice/gasprice.go @@ -42,8 +42,8 @@ var ( type Config struct { Blocks int Percentile int - MaxHeaderHistory int - MaxBlockHistory int + MaxHeaderHistory uint64 + MaxBlockHistory uint64 Default *big.Int `toml:",omitempty"` MaxPrice *big.Int `toml:",omitempty"` IgnorePrice *big.Int `toml:",omitempty"` @@ -74,7 +74,7 @@ type Oracle struct { defaultPrice *big.Int checkBlocks, percentile int sampleTxThreshold int - maxHeaderHistory, maxBlockHistory int + maxHeaderHistory, maxBlockHistory uint64 historyCache *lru.Cache } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 8bf973f9c3..c2f6281f19 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -21,7 +21,7 @@ import ( "math/rand" "sync" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -517,7 +517,7 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error { // knownCache is a cache for known hashes. type knownCache struct { - hashes mapset.Set + hashes mapset.Set[common.Hash] max int } @@ -525,7 +525,7 @@ type knownCache struct { func newKnownCache(max int) *knownCache { return &knownCache{ max: max, - hashes: mapset.NewSet(), + hashes: mapset.NewSet[common.Hash](), } } diff --git a/go.mod b/go.mod index 10353f6fac..8c1305b482 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/Azure/azure-storage-blob-go v0.7.0 + github.com/Microsoft/go-winio v0.6.1 github.com/VictoriaMetrics/fastcache v1.6.0 github.com/aws/aws-sdk-go-v2 v1.17.4 github.com/aws/aws-sdk-go-v2/config v1.18.12 @@ -14,7 +15,7 @@ require ( github.com/cloudflare/cloudflare-go v0.14.0 github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f github.com/davecgh/go-spew v1.1.1 - github.com/deckarep/golang-set v1.8.0 + github.com/deckarep/golang-set/v2 v2.1.0 github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/docker/docker v20.10.24+incompatible github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf @@ -65,7 +66,6 @@ require ( golang.org/x/text v0.13.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.6.0 - gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/urfave/cli.v1 v1.20.0 ) diff --git a/go.sum b/go.sum index c6a85eed3b..ed1f0c173e 100644 --- a/go.sum +++ b/go.sum @@ -203,7 +203,10 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= +github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= +github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 93af9af929..eafbd88b68 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -92,8 +92,8 @@ type feeHistoryResult struct { GasUsedRatio []float64 `json:"gasUsedRatio"` } -func (s *PublicEthereumAPI) FeeHistory(ctx context.Context, blockCount rpc.DecimalOrHex, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (*feeHistoryResult, error) { - oldest, reward, baseFee, gasUsed, err := s.b.FeeHistory(ctx, int(blockCount), lastBlock, rewardPercentiles) +func (s *PublicEthereumAPI) FeeHistory(ctx context.Context, blockCount math.HexOrDecimal64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (*feeHistoryResult, error) { + oldest, reward, baseFee, gasUsed, err := s.b.FeeHistory(ctx, uint64(blockCount), lastBlock, rewardPercentiles) if err != nil { return nil, err } @@ -765,13 +765,6 @@ func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Ha return nil, err } -func (s *PublicBlockChainAPI) Health() bool { - if rpc.RpcServingTimer != nil { - return rpc.RpcServingTimer.Percentile(0.75) < float64(UnHealthyTimeout) - } - return true -} - // GetUncleByBlockNumberAndIndex returns the uncle block for the given block hash and index. func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) (map[string]interface{}, error) { block, err := s.b.BlockByNumber(ctx, blockNr) diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 801483b8c3..6f3131f463 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -44,7 +44,7 @@ type Backend interface { SyncProgress() ethereum.SyncProgress SuggestGasTipCap(ctx context.Context) (*big.Int, error) - FeeHistory(ctx context.Context, blockCount int, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (*big.Int, [][]*big.Int, []*big.Int, []float64, error) + FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (*big.Int, [][]*big.Int, []*big.Int, []float64, error) Chain() *core.BlockChain ChainDb() ethdb.Database diff --git a/les/api_backend.go b/les/api_backend.go index a68ae1dee1..aeb0268da8 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -270,7 +270,7 @@ func (b *LesApiBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) return b.gpo.SuggestTipCap(ctx) } -func (b *LesApiBackend) FeeHistory(ctx context.Context, blockCount int, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) { +func (b *LesApiBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) { return b.gpo.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) } diff --git a/light/postprocess.go b/light/postprocess.go index ce38d091e8..feb4781710 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -25,7 +25,7 @@ import ( "math/big" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/core" @@ -134,7 +134,7 @@ type ChtIndexerBackend struct { diskdb, trieTable ethdb.Database odr OdrBackend triedb *trie.Database - trieset mapset.Set + trieset mapset.Set[common.Hash] section, sectionSize uint64 lastHash common.Hash trie *trie.Trie @@ -148,7 +148,7 @@ func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, dis odr: odr, trieTable: trieTable, triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down - trieset: mapset.NewSet(), + trieset: mapset.NewSet[common.Hash](), sectionSize: size, disablePruning: disablePruning, } @@ -323,7 +323,7 @@ type BloomTrieIndexerBackend struct { disablePruning bool diskdb, trieTable ethdb.Database triedb *trie.Database - trieset mapset.Set + trieset mapset.Set[common.Hash] odr OdrBackend section uint64 parentSize uint64 @@ -341,7 +341,7 @@ func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uin odr: odr, trieTable: trieTable, triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down - trieset: mapset.NewSet(), + trieset: mapset.NewSet[common.Hash](), parentSize: parentSize, size: size, disablePruning: disablePruning, diff --git a/miner/worker.go b/miner/worker.go index 44573e1dad..5acfbc23c9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -24,7 +24,7 @@ import ( "sync/atomic" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -86,11 +86,11 @@ var ( type environment struct { signer types.Signer - state *state.StateDB // apply state changes here - ancestors mapset.Set // ancestor set (used for checking uncle parent validity) - family mapset.Set // family set (used for checking uncle invalidity) - tcount int // tx count in cycle - gasPool *core.GasPool // available gas used to pack transactions + state *state.StateDB // apply state changes here + ancestors mapset.Set[common.Hash] // ancestor set (used for checking uncle parent validity) + family mapset.Set[common.Hash] // family set (used for checking uncle invalidity) + tcount int // tx count in cycle + gasPool *core.GasPool // available gas used to pack transactions coinbase common.Address header *types.Header @@ -711,8 +711,8 @@ func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase com signer: types.MakeSigner(w.chainConfig, header.Number), state: state, coinbase: coinbase, - ancestors: mapset.NewSet(), - family: mapset.NewSet(), + ancestors: mapset.NewSet[common.Hash](), + family: mapset.NewSet[common.Hash](), header: header, uncles: make(map[common.Hash]*types.Header), } diff --git a/node/api.go b/node/api.go index a685ecd6b3..21af1390d7 100644 --- a/node/api.go +++ b/node/api.go @@ -185,6 +185,10 @@ func (api *privateAdminAPI) StartHTTP(host *string, port *int, cors *string, api CorsAllowedOrigins: api.node.config.HTTPCors, Vhosts: api.node.config.HTTPVirtualHosts, Modules: api.node.config.HTTPModules, + rpcEndpointConfig: rpcEndpointConfig{ + batchItemLimit: api.node.config.BatchRequestLimit, + batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, + }, } if cors != nil { config.CorsAllowedOrigins = nil @@ -259,6 +263,10 @@ func (api *privateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str Modules: api.node.config.WSModules, Origins: api.node.config.WSOrigins, // ExposeAll: api.node.config.WSExposeAll, + rpcEndpointConfig: rpcEndpointConfig{ + batchItemLimit: api.node.config.BatchRequestLimit, + batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, + }, } if apis != nil { config.Modules = nil diff --git a/node/config.go b/node/config.go index 6745f70fe9..39e83cc734 100644 --- a/node/config.go +++ b/node/config.go @@ -201,6 +201,12 @@ type Config struct { // AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC. AllowUnprotectedTxs bool `toml:",omitempty"` + // BatchRequestLimit is the maximum number of requests in a batch. + BatchRequestLimit int `toml:",omitempty"` + + // BatchResponseMaxSize is the maximum number of bytes returned from a batched rpc call. + BatchResponseMaxSize int `toml:",omitempty"` + // EnableDoubleSignMonitor is a flag that whether to enable the double signature checker EnableDoubleSignMonitor bool `toml:",omitempty"` } diff --git a/node/defaults.go b/node/defaults.go index c685dde5d1..8eb6f4428c 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -38,14 +38,16 @@ const ( // DefaultConfig contains reasonable default settings. var DefaultConfig = Config{ - DataDir: DefaultDataDir(), - HTTPPort: DefaultHTTPPort, - HTTPModules: []string{"net", "web3"}, - HTTPVirtualHosts: []string{"localhost"}, - HTTPTimeouts: rpc.DefaultHTTPTimeouts, - WSPort: DefaultWSPort, - WSModules: []string{"net", "web3"}, - GraphQLVirtualHosts: []string{"localhost"}, + DataDir: DefaultDataDir(), + HTTPPort: DefaultHTTPPort, + HTTPModules: []string{"net", "web3"}, + HTTPVirtualHosts: []string{"localhost"}, + HTTPTimeouts: rpc.DefaultHTTPTimeouts, + WSPort: DefaultWSPort, + WSModules: []string{"net", "web3"}, + BatchRequestLimit: 1, + BatchResponseMaxSize: 4 * 1024 * 1024, + GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ ListenAddr: ":30303", MaxPeers: 50, diff --git a/node/node.go b/node/node.go index ef256f77e7..b2841df062 100644 --- a/node/node.go +++ b/node/node.go @@ -122,9 +122,11 @@ func New(conf *Config) (*Node, error) { return nil, errors.New(`Config.Name cannot end in ".ipc"`) } + server := rpc.NewServer() + server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize) node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: server, eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), @@ -376,6 +378,11 @@ func (n *Node) startRPC() error { } } + rpcConfig := rpcEndpointConfig{ + batchItemLimit: n.config.BatchRequestLimit, + batchResponseSizeLimit: n.config.BatchResponseMaxSize, + } + // Configure HTTP. if n.config.HTTPHost != "" { config := httpConfig{ @@ -383,6 +390,7 @@ func (n *Node) startRPC() error { Vhosts: n.config.HTTPVirtualHosts, Modules: n.config.HTTPModules, prefix: n.config.HTTPPathPrefix, + rpcEndpointConfig: rpcConfig, } if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil { return err @@ -396,9 +404,10 @@ func (n *Node) startRPC() error { if n.config.WSHost != "" { server := n.wsServerForPort(n.config.WSPort) config := wsConfig{ - Modules: n.config.WSModules, - Origins: n.config.WSOrigins, - prefix: n.config.WSPathPrefix, + Modules: n.config.WSModules, + Origins: n.config.WSOrigins, + prefix: n.config.WSPathPrefix, + rpcEndpointConfig: rpcConfig, } if err := server.setListenAddr(n.config.WSHost, n.config.WSPort); err != nil { return err diff --git a/node/rpcstack.go b/node/rpcstack.go index 186b0f4c42..1a020bb607 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -40,6 +40,7 @@ type httpConfig struct { CorsAllowedOrigins []string Vhosts []string prefix string // path prefix on which to mount http handler + rpcEndpointConfig } // wsConfig is the JSON-RPC/Websocket configuration @@ -47,6 +48,12 @@ type wsConfig struct { Origins []string Modules []string prefix string // path prefix on which to mount ws handler + rpcEndpointConfig +} + +type rpcEndpointConfig struct { + batchItemLimit int + batchResponseSizeLimit int } type rpcHandler struct { @@ -290,6 +297,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { // Create RPC server and handler. srv := rpc.NewServer() + srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv, false); err != nil { return err } @@ -322,6 +330,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { // Create RPC server and handler. srv := rpc.NewServer() + srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv, false); err != nil { return err } diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 1cb26a8ea0..36b5286517 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -206,7 +206,7 @@ func (sn *SimNode) ServeRPC(conn *websocket.Conn) error { if err != nil { return err } - codec := rpc.NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON) + codec := rpc.NewFuncCodec(conn, func(v any, _ bool) error { return conn.WriteJSON(v) }, conn.ReadJSON) handler.ServeCodec(codec, 0) return nil } diff --git a/rpc/client.go b/rpc/client.go index a89f8ba18c..2b0016db8f 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "net/url" + "os" "reflect" "strconv" "sync/atomic" @@ -33,16 +34,17 @@ import ( var ( ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") - ErrNoResult = errors.New("no result in JSON-RPC response") + ErrNoResult = errors.New("JSON-RPC response has no result") + ErrMissingBatchResponse = errors.New("response batch did not contain a response to this call") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") errClientReconnected = errors.New("client reconnected") errDead = errors.New("connection lost") ) +// Timeouts const ( - // Timeouts defaultDialTimeout = 10 * time.Second // used if context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls + subscribeTimeout = 10 * time.Second // overall timeout eth_subscribe, rpc_modules calls ) const ( @@ -78,11 +80,15 @@ type Client struct { isHTTP bool // connection type: http, ws or ipc services *serviceRegistry - idCounter uint32 + idCounter atomic.Uint32 // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc + // config fields + batchItemLimit int + batchResponseMaxSize int + // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. @@ -100,7 +106,7 @@ type Client struct { reqTimeout chan *requestOp // removes response IDs when call timeout expires } -type reconnectFunc func(ctx context.Context) (ServerCodec, error) +type reconnectFunc func(context.Context) (ServerCodec, error) type clientContextKey struct{} @@ -113,7 +119,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - handler := newHandler(ctx, conn, c.idgen, c.services) + handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit, c.batchResponseMaxSize) return &clientConn{conn, handler} } @@ -127,14 +133,17 @@ type readOp struct { batch bool } +// requestOp represents a pending request. This is used for both batch and non-batch +// requests. type requestOp struct { - ids []json.RawMessage - err error - resp chan *jsonrpcMessage // receives up to len(ids) responses - sub *ClientSubscription // only set for EthSubscribe requests + ids []json.RawMessage + err error + resp chan []*jsonrpcMessage // the response goes here + sub *ClientSubscription // set for Subscribe requests. + hadResponse bool // true when the request was responded to } -func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { +func (op *requestOp) wait(ctx context.Context, c *Client) ([]*jsonrpcMessage, error) { select { case <-ctx.Done(): // Send the timeout to dispatch so it can remove the request IDs. @@ -154,14 +163,16 @@ func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, erro // // The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a // file name with no URL scheme, a local socket connection is established using UNIX -// domain sockets on supported platforms and named pipes on Windows. If you want to -// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. +// domain sockets on supported platforms and named pipes on Windows. +// +// If you want to further configure the transport, use DialOptions instead of this +// function. // // For websocket connections, the origin is set to the local host name. // -// The client reconnects automatically if the connection is lost. +// The client reconnects automatically when the connection is lost. func Dial(rawurl string) (*Client, error) { - return DialContext(context.Background(), rawurl) + return DialOptions(context.Background(), rawurl) } // DialContext creates a new RPC client, just like Dial. @@ -169,58 +180,91 @@ func Dial(rawurl string) (*Client, error) { // The context is used to cancel or time out the initial connection establishment. It does // not affect subsequent interactions with the client. func DialContext(ctx context.Context, rawurl string) (*Client, error) { + return DialOptions(ctx, rawurl) +} + +// DialOptions creates a new RPC client for the given URL. You can supply any of the +// pre-defined client options to configure the underlying transport. +// +// The context is used to cancel or time out the initial connection establishment. It does +// not affect subsequent interactions with the client. +// +// The client reconnects automatically when the connection is lost. +func DialOptions(ctx context.Context, rawurl string, options ...ClientOption) (*Client, error) { u, err := url.Parse(rawurl) if err != nil { return nil, err } + + cfg := new(clientConfig) + for _, opt := range options { + opt.applyOption(cfg) + } + + var reconnect reconnectFunc switch u.Scheme { case "http", "https": - return DialHTTP(rawurl) + reconnect = newClientTransportHTTP(rawurl, cfg) case "ws", "wss": - return DialWebsocket(ctx, rawurl, "") + rc, err := newClientTransportWS(rawurl, cfg) + if err != nil { + return nil, err + } + reconnect = rc case "stdio": - return DialStdIO(ctx) + reconnect = newClientTransportIO(os.Stdin, os.Stdout) case "": - return DialIPC(ctx, rawurl) + reconnect = newClientTransportIPC(rawurl) default: return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) } + + return newClient(ctx, cfg, reconnect) } -// Client retrieves the client from the context, if any. This can be used to perform +// ClientFromContext retrieves the client from the context, if any. This can be used to perform // 'reverse calls' in a handler method. func ClientFromContext(ctx context.Context) (*Client, bool) { client, ok := ctx.Value(clientContextKey{}).(*Client) return client, ok } -func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { +func newClient(initctx context.Context, cfg *clientConfig, connect reconnectFunc) (*Client, error) { conn, err := connect(initctx) if err != nil { return nil, err } - c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) + c := initClient(conn, new(serviceRegistry), cfg) c.reconnectFunc = connect return c, nil } -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { +func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) *Client { _, isHTTP := conn.(*httpConn) c := &Client{ - isHTTP: isHTTP, - idgen: idgen, - services: services, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), - } + isHTTP: isHTTP, + services: services, + idgen: cfg.idgen, + batchItemLimit: cfg.batchItemLimit, + batchResponseMaxSize: cfg.batchResponseLimit, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + } + + // Set defaults. + if c.idgen == nil { + c.idgen = randomIDGenerator() + } + + // Launch the main loop. if !isHTTP { go c.dispatch(conn) } @@ -236,7 +280,7 @@ func (c *Client) RegisterName(name string, receiver interface{}) error { } func (c *Client) nextID() json.RawMessage { - id := atomic.AddUint32(&c.idCounter, 1) + id := c.idCounter.Add(1) return strconv.AppendUint(nil, uint64(id), 10) } @@ -298,7 +342,10 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str if err != nil { return err } - op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan []*jsonrpcMessage, 1), + } if c.isHTTP { err = c.sendHTTP(ctx, op, msg) @@ -310,15 +357,21 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str } // dispatch has accepted the request and will close the channel when it quits. - switch resp, err := op.wait(ctx, c); { - case err != nil: + batchresp, err := op.wait(ctx, c) + if err != nil { return err + } + resp := batchresp[0] + switch { case resp.Error != nil: return resp.Error case len(resp.Result) == 0: return ErrNoResult default: - return json.Unmarshal(resp.Result, &result) + if result == nil { + return nil + } + return json.Unmarshal(resp.Result, result) } } @@ -334,7 +387,7 @@ func (c *Client) BatchCall(b []BatchElem) error { return c.BatchCallContext(ctx, b) } -// BatchCall sends all given requests as a single batch and waits for the server +// BatchCallContext sends all given requests as a single batch and waits for the server // to return a response for all of them. The wait duration is bounded by the // context's deadline. // @@ -350,7 +403,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { ) op := &requestOp{ ids: make([]json.RawMessage, len(b)), - resp: make(chan *jsonrpcMessage, len(b)), + resp: make(chan []*jsonrpcMessage, 1), } for i, elem := range b { msg, err := c.newMessage(elem.Method, elem.Args...) @@ -368,28 +421,48 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { } else { err = c.send(ctx, op, msgs) } + if err != nil { + return err + } + + batchresp, err := op.wait(ctx, c) + if err != nil { + return err + } // Wait for all responses to come back. - for n := 0; n < len(b) && err == nil; n++ { - var resp *jsonrpcMessage - resp, err = op.wait(ctx, c) - if err != nil { - break + for n := 0; n < len(batchresp) && err == nil; n++ { + resp := batchresp[n] + if resp == nil { + // Ignore null responses. These can happen for batches sent via HTTP. + continue } + // Find the element corresponding to this response. - // The element is guaranteed to be present because dispatch - // only sends valid IDs to our channel. - elem := &b[byID[string(resp.ID)]] - if resp.Error != nil { - elem.Error = resp.Error + index, ok := byID[string(resp.ID)] + if !ok { continue } - if len(resp.Result) == 0 { + delete(byID, string(resp.ID)) + + // Assign result and error. + elem := &b[index] + switch { + case resp.Error != nil: + elem.Error = resp.Error + case resp.Result == nil: elem.Error = ErrNoResult - continue + default: + elem.Error = json.Unmarshal(resp.Result, elem.Result) } - elem.Error = json.Unmarshal(resp.Result, elem.Result) } + + // Check that all expected responses have been received. + for _, index := range byID { + elem := &b[index] + elem.Error = ErrMissingBatchResponse + } + return err } @@ -450,7 +523,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf } op := &requestOp{ ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), + resp: make(chan []*jsonrpcMessage, 1), sub: newClientSubscription(c, namespace, chanVal), } @@ -465,6 +538,13 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf return op.sub, nil } +// SupportsSubscriptions reports whether subscriptions are supported by the client +// transport. When this returns false, Subscribe and related methods will return +// ErrNotificationsUnsupported. +func (c *Client) SupportsSubscriptions() bool { + return !c.isHTTP +} + func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} if paramsIn != nil { // prevent sending "params":null @@ -500,7 +580,7 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { return err } } - err := c.writeConn.writeJSON(ctx, msg) + err := c.writeConn.writeJSON(ctx, msg, false) if err != nil { c.writeConn = nil if !retry { @@ -565,9 +645,9 @@ func (c *Client) dispatch(codec ServerCodec) { // Read path: case op := <-c.readOp: if op.batch { - conn.handler.handleBatch(context.Background(), op.msgs) + conn.handler.handleBatch(op.msgs) } else { - conn.handler.handleMsg(context.Background(), op.msgs[0]) + conn.handler.handleMsg(op.msgs[0]) } case err := <-c.readErr: @@ -633,7 +713,8 @@ func (c *Client) read(codec ServerCodec) { for { msgs, batch, err := codec.readBatch() if _, ok := err.(*json.SyntaxError); ok { - codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()})) + msg := errorMessage(&parseError{err.Error()}) + codec.writeJSON(context.Background(), msg, true) } if err != nil { c.readErr <- err diff --git a/rpc/client_opt.go b/rpc/client_opt.go new file mode 100644 index 0000000000..3fa045a9b9 --- /dev/null +++ b/rpc/client_opt.go @@ -0,0 +1,144 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "net/http" + + "github.com/gorilla/websocket" +) + +// ClientOption is a configuration option for the RPC client. +type ClientOption interface { + applyOption(*clientConfig) +} + +type clientConfig struct { + // HTTP settings + httpClient *http.Client + httpHeaders http.Header + httpAuth HTTPAuth + + // WebSocket options + wsDialer *websocket.Dialer + wsMessageSizeLimit *int64 // wsMessageSizeLimit nil = default, 0 = no limit + + // RPC handler options + idgen func() ID + batchItemLimit int + batchResponseLimit int +} + +func (cfg *clientConfig) initHeaders() { + if cfg.httpHeaders == nil { + cfg.httpHeaders = make(http.Header) + } +} + +func (cfg *clientConfig) setHeader(key, value string) { + cfg.initHeaders() + cfg.httpHeaders.Set(key, value) +} + +type optionFunc func(*clientConfig) + +func (fn optionFunc) applyOption(opt *clientConfig) { + fn(opt) +} + +// WithWebsocketDialer configures the websocket.Dialer used by the RPC client. +func WithWebsocketDialer(dialer websocket.Dialer) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.wsDialer = &dialer + }) +} + +// WithWebsocketMessageSizeLimit configures the websocket message size limit used by the RPC +// client. Passing a limit of 0 means no limit. +func WithWebsocketMessageSizeLimit(messageSizeLimit int64) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.wsMessageSizeLimit = &messageSizeLimit + }) +} + +// WithHeader configures HTTP headers set by the RPC client. Headers set using this option +// will be used for both HTTP and WebSocket connections. +func WithHeader(key, value string) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.initHeaders() + cfg.httpHeaders.Set(key, value) + }) +} + +// WithHeaders configures HTTP headers set by the RPC client. Headers set using this +// option will be used for both HTTP and WebSocket connections. +func WithHeaders(headers http.Header) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.initHeaders() + for k, vs := range headers { + cfg.httpHeaders[k] = vs + } + }) +} + +// WithHTTPClient configures the http.Client used by the RPC client. +func WithHTTPClient(c *http.Client) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.httpClient = c + }) +} + +// WithHTTPAuth configures HTTP request authentication. The given provider will be called +// whenever a request is made. Note that only one authentication provider can be active at +// any time. +func WithHTTPAuth(a HTTPAuth) ClientOption { + if a == nil { + panic("nil auth") + } + return optionFunc(func(cfg *clientConfig) { + cfg.httpAuth = a + }) +} + +// A HTTPAuth function is called by the client whenever a HTTP request is sent. +// The function must be safe for concurrent use. +// +// Usually, HTTPAuth functions will call h.Set("authorization", "...") to add +// auth information to the request. +type HTTPAuth func(h http.Header) error + +// WithBatchItemLimit changes the maximum number of items allowed in batch requests. +// +// Note: this option applies when processing incoming batch requests. It does not affect +// batch requests sent by the client. +func WithBatchItemLimit(limit int) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.batchItemLimit = limit + }) +} + +// WithBatchResponseSizeLimit changes the maximum number of response bytes that can be +// generated for batch requests. When this limit is reached, further calls in the batch +// will not be processed. +// +// Note: this option applies when processing incoming batch requests. It does not affect +// batch requests sent by the client. +func WithBatchResponseSizeLimit(sizeLimit int) ClientOption { + return optionFunc(func(cfg *clientConfig) { + cfg.batchResponseLimit = sizeLimit + }) +} diff --git a/rpc/client_opt_test.go b/rpc/client_opt_test.go new file mode 100644 index 0000000000..d7cc2572a7 --- /dev/null +++ b/rpc/client_opt_test.go @@ -0,0 +1,25 @@ +package rpc_test + +import ( + "context" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/rpc" +) + +// This example configures a HTTP-based RPC client with two options - one setting the +// overall request timeout, the other adding a custom HTTP header to all requests. +func ExampleDialOptions() { + tokenHeader := rpc.WithHeader("x-token", "foo") + httpClient := rpc.WithHTTPClient(&http.Client{ + Timeout: 10 * time.Second, + }) + + ctx := context.Background() + c, err := rpc.DialOptions(ctx, "http://rpc.example.com", httpClient, tokenHeader) + if err != nil { + panic(err) + } + c.Close() +} diff --git a/rpc/client_test.go b/rpc/client_test.go index c20dc38b4f..7c96b2d666 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -69,6 +69,26 @@ func TestClientResponseType(t *testing.T) { } } +// This test checks calling a method that returns 'null'. +func TestClientNullResponse(t *testing.T) { + server := newTestServer() + defer server.Stop() + + client := DialInProc(server) + defer client.Close() + + var result json.RawMessage + if err := client.Call(&result, "test_null"); err != nil { + t.Fatal(err) + } + if result == nil { + t.Fatal("Expected non-nil result") + } + if !reflect.DeepEqual(result, json.RawMessage("null")) { + t.Errorf("Expected null, got %s", result) + } +} + // This test checks that server-returned errors with code and data come out of Client.Call. func TestClientErrorData(t *testing.T) { server := newTestServer() @@ -83,11 +103,15 @@ func TestClientErrorData(t *testing.T) { } // Check code. + // The method handler returns an error value which implements the rpc.Error + // interface, i.e. it has a custom error code. The server returns this error code. + expectedCode := testError{}.ErrorCode() if e, ok := err.(Error); !ok { t.Fatalf("client did not return rpc.Error, got %#v", e) - } else if e.ErrorCode() != (testError{}.ErrorCode()) { - t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), testError{}.ErrorCode()) + } else if e.ErrorCode() != expectedCode { + t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), expectedCode) } + // Check data. if e, ok := err.(DataError); !ok { t.Fatalf("client did not return rpc.DataError, got %#v", e) @@ -145,10 +169,12 @@ func TestClientBatchRequest(t *testing.T) { } } +// This checks that, for HTTP connections, the length of batch responses is validated to +// match the request exactly. func TestClientBatchRequest_len(t *testing.T) { b, err := json.Marshal([]jsonrpcMessage{ - {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, - {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, + {Version: "2.0", ID: json.RawMessage("1"), Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Result: json.RawMessage(`"0x2"`)}, }) if err != nil { t.Fatal("failed to encode jsonrpc message:", err) @@ -161,37 +187,102 @@ func TestClientBatchRequest_len(t *testing.T) { })) t.Cleanup(s.Close) - client, err := Dial(s.URL) - if err != nil { - t.Fatal("failed to dial test server:", err) - } - defer client.Close() - t.Run("too-few", func(t *testing.T) { + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + batch := []BatchElem{ - {Method: "foo"}, - {Method: "bar"}, - {Method: "baz"}, + {Method: "foo", Result: new(string)}, + {Method: "bar", Result: new(string)}, + {Method: "baz", Result: new(string)}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { - t.Errorf("expected %q but got: %v", ErrBadResult, err) + + if err := client.BatchCallContext(ctx, batch); err != nil { + t.Fatal("error:", err) + } + for i, elem := range batch[:2] { + if elem.Error != nil { + t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) + } + } + for i, elem := range batch[2:] { + if elem.Error != ErrMissingBatchResponse { + t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) + } } }) t.Run("too-many", func(t *testing.T) { + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + batch := []BatchElem{ - {Method: "foo"}, + {Method: "foo", Result: new(string)}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { - t.Errorf("expected %q but got: %v", ErrBadResult, err) + + if err := client.BatchCallContext(ctx, batch); err != nil { + t.Fatal("error:", err) + } + for i, elem := range batch[:1] { + if elem.Error != nil { + t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) + } + } + for i, elem := range batch[1:] { + if elem.Error != ErrMissingBatchResponse { + t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) + } } }) } +// This checks that the client can handle the case where the server doesn't +// respond to all requests in a batch. +func TestClientBatchRequestLimit(t *testing.T) { + server := newTestServer() + defer server.Stop() + server.SetBatchLimits(2, 100000) + client := DialInProc(server) + + batch := []BatchElem{ + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, + } + err := client.BatchCall(batch) + if err != nil { + t.Fatal("unexpected error:", err) + } + + // Check that the first response indicates an error with batch size. + var err0 Error + if !errors.As(batch[0].Error, &err0) { + t.Log("error zero:", batch[0].Error) + t.Fatalf("batch elem 0 has wrong error type: %T", batch[0].Error) + } else { + if err0.ErrorCode() != -32600 || err0.Error() != errMsgBatchTooLarge { + t.Fatalf("wrong error on batch elem zero: %v", err0) + } + } + + // Check that remaining response batch elements are reported as absent. + for i, elem := range batch[1:] { + if elem.Error != ErrMissingBatchResponse { + t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) + } + } +} + func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() @@ -286,7 +377,7 @@ func testClientCancel(transport string, t *testing.T) { _, hasDeadline := ctx.Deadline() t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline) // default: - // t.Logf("got expected error with %v wait time: %v", timeout, err) + // t.Logf("got expected error with %v wait time: %v", timeout, err) } cancel() } @@ -463,7 +554,8 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) { defer srv.Stop() // Create the client on the other end of the pipe. - client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + client, _ := newClient(context.Background(), cfg, func(context.Context) (ServerCodec, error) { return NewCodec(p2), nil }) defer client.Close() @@ -663,10 +755,10 @@ func TestClientReconnect(t *testing.T) { // Start a server and corresponding client. s1, l1 := startServer("127.0.0.1:0") client, err := DialContext(ctx, "ws://"+l1.Addr().String()) - defer client.Close() if err != nil { t.Fatal("can't dial", err) } + defer client.Close() // Perform a call. This should work because the server is up. var resp echoResult diff --git a/rpc/constants_unix.go b/rpc/constants_unix.go deleted file mode 100644 index 1f04d15d7f..0000000000 --- a/rpc/constants_unix.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -//go:build darwin || dragonfly || freebsd || linux || nacl || netbsd || openbsd || solaris -// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris - -package rpc - -/* -#include - -int max_socket_path_size() { -struct sockaddr_un s; -return sizeof(s.sun_path); -} -*/ -import "C" - -var ( - max_path_size = C.max_socket_path_size() -) diff --git a/rpc/constants_unix_nocgo.go b/rpc/constants_unix_nocgo.go deleted file mode 100644 index a62e4ee529..0000000000 --- a/rpc/constants_unix_nocgo.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -//go:build !cgo && !windows -// +build !cgo,!windows - -package rpc - -var ( - // On Linux, sun_path is 108 bytes in size - // see http://man7.org/linux/man-pages/man7/unix.7.html - max_path_size = 108 -) diff --git a/rpc/context_headers.go b/rpc/context_headers.go new file mode 100644 index 0000000000..29a58150e3 --- /dev/null +++ b/rpc/context_headers.go @@ -0,0 +1,56 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "context" + "net/http" +) + +type mdHeaderKey struct{} + +// NewContextWithHeaders wraps the given context, adding HTTP headers. These headers will +// be applied by Client when making a request using the returned context. +func NewContextWithHeaders(ctx context.Context, h http.Header) context.Context { + if len(h) == 0 { + // This check ensures the header map set in context will never be nil. + return ctx + } + + var ctxh http.Header + prev, ok := ctx.Value(mdHeaderKey{}).(http.Header) + if ok { + ctxh = setHeaders(prev.Clone(), h) + } else { + ctxh = h.Clone() + } + return context.WithValue(ctx, mdHeaderKey{}, ctxh) +} + +// headersFromContext is used to extract http.Header from context. +func headersFromContext(ctx context.Context) http.Header { + source, _ := ctx.Value(mdHeaderKey{}).(http.Header) + return source +} + +// setHeaders sets all headers from src in dst. +func setHeaders(dst http.Header, src http.Header) http.Header { + for key, values := range src { + dst[http.CanonicalHeaderKey(key)] = values + } + return dst +} diff --git a/rpc/errors.go b/rpc/errors.go index 75425b925a..438aff218c 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -54,10 +54,24 @@ var ( _ Error = new(invalidRequestError) _ Error = new(invalidMessageError) _ Error = new(invalidParamsError) - _ Error = new(CustomError) + _ Error = new(internalServerError) ) -const defaultErrorCode = -32000 +const ( + errcodeDefault = -32000 + errcodeTimeout = -32002 + errcodeResponseTooLarge = -32003 + errcodePanic = -32603 + errcodeMarshalError = -32603 + + legacyErrcodeNotificationsUnsupported = -32001 +) + +const ( + errMsgTimeout = "request timed out" + errMsgResponseTooLarge = "response too large" + errMsgBatchTooLarge = "batch too large" +) type methodNotFoundError struct{ method string } @@ -67,6 +81,34 @@ func (e *methodNotFoundError) Error() string { return fmt.Sprintf("the method %s does not exist/is not available", e.method) } +type notificationsUnsupportedError struct{} + +func (e notificationsUnsupportedError) Error() string { + return "notifications not supported" +} + +func (e notificationsUnsupportedError) ErrorCode() int { return -32601 } + +// Is checks for equivalence to another error. Here we define that all errors with code +// -32601 (method not found) are equivalent to notificationsUnsupportedError. This is +// done to enable the following pattern: +// +// sub, err := client.Subscribe(...) +// if errors.Is(err, rpc.ErrNotificationsUnsupported) { +// // server doesn't support subscriptions +// } +func (e notificationsUnsupportedError) Is(other error) bool { + if other == (notificationsUnsupportedError{}) { + return true + } + rpcErr, ok := other.(Error) + if ok { + code := rpcErr.ErrorCode() + return code == -32601 || code == legacyErrcodeNotificationsUnsupported + } + return false +} + type subscriptionNotFoundError struct{ namespace, subscription string } func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } @@ -103,11 +145,12 @@ func (e *invalidParamsError) ErrorCode() int { return -32602 } func (e *invalidParamsError) Error() string { return e.message } -type CustomError struct { - Code int - ValidationError string +// internalServerError is used for server errors during request processing. +type internalServerError struct { + code int + message string } -func (e *CustomError) ErrorCode() int { return e.Code } +func (e *internalServerError) ErrorCode() int { return e.code } -func (e *CustomError) Error() string { return e.ValidationError } +func (e *internalServerError) Error() string { return e.message } diff --git a/rpc/handler.go b/rpc/handler.go index 7abc387fdb..f44e4d7b01 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -46,20 +46,22 @@ import ( // Now send the request, then wait for the reply to be delivered through handleMsg: // // if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. +// h.removeRequestOp(op) // timeout, etc. // } type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool + batchRequestLimit int + batchResponseMaxSize int subLock sync.Mutex serverSubs map[ID]*Subscription @@ -70,19 +72,21 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit, batchResponseMaxSize int) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), + batchRequestLimit: batchRequestLimit, + batchResponseMaxSize: batchResponseMaxSize, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) @@ -91,61 +95,219 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg * return h } +// batchCallBuffer manages in progress call messages and their responses during a batch +// call. Calls need to be synchronized between the processing and timeout-triggering +// goroutines. +type batchCallBuffer struct { + mutex sync.Mutex + calls []*jsonrpcMessage + resp []*jsonrpcMessage + wrote bool +} + +// nextCall returns the next unprocessed message. +func (b *batchCallBuffer) nextCall() *jsonrpcMessage { + b.mutex.Lock() + defer b.mutex.Unlock() + + if len(b.calls) == 0 { + return nil + } + // The popping happens in `pushAnswer`. The in progress call is kept + // so we can return an error for it in case of timeout. + msg := b.calls[0] + return msg +} + +// pushResponse adds the response to last call returned by nextCall. +func (b *batchCallBuffer) pushResponse(answer *jsonrpcMessage) { + b.mutex.Lock() + defer b.mutex.Unlock() + + if answer != nil { + b.resp = append(b.resp, answer) + } + b.calls = b.calls[1:] +} + +// write sends the responses. +func (b *batchCallBuffer) write(ctx context.Context, conn jsonWriter) { + b.mutex.Lock() + defer b.mutex.Unlock() + + b.doWrite(ctx, conn, false) +} + +// respondWithError sends the responses added so far. For the remaining unanswered call +// messages, it responds with the given error. +func (b *batchCallBuffer) respondWithError(ctx context.Context, conn jsonWriter, err error) { + b.mutex.Lock() + defer b.mutex.Unlock() + + for _, msg := range b.calls { + if !msg.isNotification() { + b.resp = append(b.resp, msg.errorResponse(err)) + } + } + b.doWrite(ctx, conn, true) +} + +// doWrite actually writes the response. +// This assumes b.mutex is held. +func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorResponse bool) { + if b.wrote { + return + } + b.wrote = true // can only write once + if len(b.resp) > 0 { + conn.writeJSON(ctx, b.resp, isErrorResponse) + } +} + // handleBatch executes all messages in a batch and returns the responses. -func (h *handler) handleBatch(ctx context.Context, msgs []*jsonrpcMessage) { +func (h *handler) handleBatch(msgs []*jsonrpcMessage) { // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { - h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + resp := errorMessage(&invalidRequestError{"empty batch"}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + return + } + // Apply limit on total number of requests. + if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit { + h.startCallProc(func(cp *callProc) { + h.respondWithBatchTooLarge(cp, msgs) }) return } - // Handle non-call messages first: + // Handle non-call messages first. + // Here we need to find the requestOp that sent the request batch. calls := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range msgs { - if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - } - } + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + calls = append(calls, msg) + }) if len(calls) == 0 { return } + // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { - answers := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range calls { - if answer := h.handleCallMsg(cp, ctx, msg); answer != nil { - answers = append(answers, answer) + var ( + timer *time.Timer + cancel context.CancelFunc + callBuffer = &batchCallBuffer{calls: calls, resp: make([]*jsonrpcMessage, 0, len(calls))} + ) + + cp.ctx, cancel = context.WithCancel(cp.ctx) + defer cancel() + + // Cancel the request context after timeout and send an error response. Since the + // currently-running method might not return immediately on timeout, we must wait + // for the timeout concurrently with processing the request. + if timeout, ok := ContextRequestTimeout(cp.ctx); ok { + timer = time.AfterFunc(timeout, func() { + cancel() + err := &internalServerError{errcodeTimeout, errMsgTimeout} + callBuffer.respondWithError(cp.ctx, h.conn, err) + }) + } + + responseBytes := 0 + for { + // No need to handle rest of calls if timed out. + if cp.ctx.Err() != nil { + break + } + msg := callBuffer.nextCall() + if msg == nil { + break + } + resp := h.handleCallMsg(cp, msg) + callBuffer.pushResponse(resp) + if resp != nil && h.batchResponseMaxSize != 0 { + responseBytes += len(resp.Result) + if responseBytes > h.batchResponseMaxSize { + err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} + callBuffer.respondWithError(cp.ctx, h.conn, err) + break + } } } - h.addSubscriptions(cp.notifiers) - if len(answers) > 0 { - h.conn.writeJSON(cp.ctx, answers) + if timer != nil { + timer.Stop() } + + h.addSubscriptions(cp.notifiers) + callBuffer.write(cp.ctx, h.conn) for _, n := range cp.notifiers { n.activate() } }) } -// handleMsg handles a single message. -func (h *handler) handleMsg(ctx context.Context, msg *jsonrpcMessage) { - if ok := h.handleImmediate(msg); ok { - return - } - h.startCallProc(func(cp *callProc) { - answer := h.handleCallMsg(cp, ctx, msg) - h.addSubscriptions(cp.notifiers) - if answer != nil { - h.conn.writeJSON(cp.ctx, answer) - } - for _, n := range cp.notifiers { - n.activate() +func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) { + resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge}) + // Find the first call and add its "id" field to the error. + // This is the best we can do, given that the protocol doesn't have a way + // of reporting an error for the entire batch. + for _, msg := range batch { + if msg.isCall() { + resp.ID = msg.ID + break } + } + h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true) +} + +// handleMsg handles a single non-batch message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + msgs := []*jsonrpcMessage{msg} + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + h.startCallProc(func(cp *callProc) { + h.handleNonBatchCall(cp, msg) + }) }) } +func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { + var ( + responded sync.Once + timer *time.Timer + cancel context.CancelFunc + ) + cp.ctx, cancel = context.WithCancel(cp.ctx) + defer cancel() + + // Cancel the request context after timeout and send an error response. Since the + // running method might not return immediately on timeout, we must wait for the + // timeout concurrently with processing the request. + if timeout, ok := ContextRequestTimeout(cp.ctx); ok { + timer = time.AfterFunc(timeout, func() { + cancel() + responded.Do(func() { + resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + }) + } + + answer := h.handleCallMsg(cp, msg) + if timer != nil { + timer.Stop() + } + h.addSubscriptions(cp.notifiers) + if answer != nil { + responded.Do(func() { + h.conn.writeJSON(cp.ctx, answer, false) + }) + } + for _, n := range cp.notifiers { + n.activate() + } +} + // close cancels all requests except for inflightReq and waits for // call goroutines to shut down. func (h *handler) close(err error, inflightReq *requestOp) { @@ -226,23 +388,60 @@ func (h *handler) startCallProc(fn func(*callProc)) { }() } -// handleImmediate executes non-call messages. It returns false if the message is a -// call or requires a reply. -func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { - start := time.Now() - switch { - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - return true +// handleResponse processes method call responses. +func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*jsonrpcMessage)) { + var resolvedops []*requestOp + handleResp := func(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] + if op == nil { + h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) + return + } + resolvedops = append(resolvedops, op) + delete(h.respWait, string(msg.ID)) + + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + if op.sub != nil { + if msg.Error != nil { + op.err = msg.Error + } else { + op.err = json.Unmarshal(msg.Result, &op.sub.subid) + if op.err == nil { + go op.sub.run() + h.clientSubs[op.sub.subid] = op.sub + } + } + } + + if !op.hadResponse { + op.hadResponse = true + op.resp <- batch } - return false - case msg.isResponse(): - h.handleResponse(msg) - h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) - return true - default: - return false + } + + for _, msg := range batch { + start := time.Now() + switch { + case msg.isResponse(): + handleResp(msg) + h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) + + case msg.isNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResult(msg) + continue + } + handleCall(msg) + + default: + handleCall(msg) + } + } + + for _, op := range resolvedops { + h.removeRequestOp(op) } } @@ -258,49 +457,20 @@ func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { } } -// handleResponse processes method call responses. -func (h *handler) handleResponse(msg *jsonrpcMessage) { - op := h.respWait[string(msg.ID)] - if op == nil { - h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) - return - } - delete(h.respWait, string(msg.ID)) - // For normal responses, just forward the reply to Call/BatchCall. - if op.sub == nil { - op.resp <- msg - return - } - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - defer close(op.resp) - if msg.Error != nil { - op.err = msg.Error - return - } - if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.run() - h.clientSubs[op.sub.subid] = op.sub - } -} - // handleCallMsg executes a call message and returns the answer. -func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *jsonrpcMessage) *jsonrpcMessage { +func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { start := time.Now() switch { case msg.isNotification(): h.handleCall(ctx, msg) h.log.Debug("Served "+msg.Method, "duration", time.Since(start)) return nil + case msg.isCall(): resp := h.handleCall(ctx, msg) var ctx []interface{} ctx = append(ctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start)) if resp.Error != nil { - xForward := reqCtx.Value("X-Forwarded-For") - h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward) - ctx = append(ctx, "err", resp.Error.Message) if resp.Error.Data != nil { ctx = append(ctx, "errdata", resp.Error.Data) @@ -310,8 +480,10 @@ func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *json h.log.Debug("Served "+msg.Method, ctx...) } return resp + case msg.hasValidID(): return msg.errorResponse(&invalidRequestError{"invalid request"}) + default: return errorMessage(&invalidRequestError{"invalid request"}) } @@ -331,6 +503,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if callb == nil { return msg.errorResponse(&methodNotFoundError{method: msg.Method}) } + args, err := parsePositionalArguments(msg.Params, callb.argTypes) if err != nil { return msg.errorResponse(&invalidParamsError{err.Error()}) @@ -343,14 +516,14 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if callb != h.unsubscribeCb { rpcRequestGauge.Inc(1) if answer.Error != nil { - failedReqeustGauge.Inc(1) + failedRequestGauge.Inc(1) } else { successfulRequestGauge.Inc(1) } - RpcServingTimer.UpdateSince(start) - newRPCRequestGauge(msg.Method).Inc(1) - newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start) + rpcServingTimer.UpdateSince(start) + updateServeTimeHistogram(msg.Method, answer.Error == nil, time.Since(start)) } + return answer } diff --git a/rpc/http.go b/rpc/http.go index 5b522fa557..741fa1c0eb 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -23,9 +23,11 @@ import ( "errors" "fmt" "io" + "math" "mime" "net/http" "net/url" + "strconv" "sync" "time" ) @@ -45,13 +47,14 @@ type httpConn struct { closeCh chan interface{} mu sync.Mutex // protects headers headers http.Header + auth HTTPAuth } // httpConn implements ServerCodec, but it is treated specially by Client // and some methods don't work. The panic() stubs here exist to ensure // this special treatment is correct. -func (hc *httpConn) writeJSON(context.Context, interface{}) error { +func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error { panic("writeJSON called on httpConn") } @@ -87,6 +90,14 @@ type HTTPTimeouts struct { // ReadHeaderTimeout. It is valid to use them both. ReadTimeout time.Duration + // ReadHeaderTimeout is the amount of time allowed to read + // request headers. The connection's read deadline is reset + // after reading the headers and the Handler can decide what + // is considered too slow for the body. If ReadHeaderTimeout + // is zero, the value of ReadTimeout is used. If both are + // zero, there is no timeout. + ReadHeaderTimeout time.Duration + // WriteTimeout is the maximum duration before timing out // writes of the response. It is reset whenever a new // request's header is read. Like ReadTimeout, it does not @@ -103,13 +114,21 @@ type HTTPTimeouts struct { // DefaultHTTPTimeouts represents the default timeout values used if further // configuration is not provided. var DefaultHTTPTimeouts = HTTPTimeouts{ - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, + ReadTimeout: 30 * time.Second, + ReadHeaderTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, +} + +// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. +func DialHTTP(endpoint string) (*Client, error) { + return DialHTTPWithClient(endpoint, new(http.Client)) } // DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP // using the provided HTTP Client. +// +// Deprecated: use DialOptions and the WithHTTPClient option. func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { // Sanity check URL so we don't end up with a client that will fail every request. _, err := url.Parse(endpoint) @@ -117,24 +136,36 @@ func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { return nil, err } - initctx := context.Background() - headers := make(http.Header, 2) + var cfg clientConfig + cfg.httpClient = client + fn := newClientTransportHTTP(endpoint, &cfg) + return newClient(context.Background(), &cfg, fn) +} + +func newClientTransportHTTP(endpoint string, cfg *clientConfig) reconnectFunc { + headers := make(http.Header, 2+len(cfg.httpHeaders)) headers.Set("accept", contentType) headers.Set("content-type", contentType) - return newClient(initctx, func(context.Context) (ServerCodec, error) { - hc := &httpConn{ - client: client, - headers: headers, - url: endpoint, - closeCh: make(chan interface{}), - } - return hc, nil - }) -} + for key, values := range cfg.httpHeaders { + headers[key] = values + } -// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. -func DialHTTP(endpoint string) (*Client, error) { - return DialHTTPWithClient(endpoint, new(http.Client)) + client := cfg.httpClient + if client == nil { + client = new(http.Client) + } + + hc := &httpConn{ + client: client, + headers: headers, + url: endpoint, + auth: cfg.httpAuth, + closeCh: make(chan interface{}), + } + + return func(ctx context.Context) (ServerCodec, error) { + return hc, nil + } } func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { @@ -145,11 +176,12 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e } defer respBody.Close() - var respmsg jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { + var resp jsonrpcMessage + batch := [1]*jsonrpcMessage{&resp} + if err := json.NewDecoder(respBody).Decode(&resp); err != nil { return err } - op.resp <- &respmsg + op.resp <- batch[:] return nil } @@ -160,16 +192,12 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr return err } defer respBody.Close() - var respmsgs []jsonrpcMessage + + var respmsgs []*jsonrpcMessage if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - if len(respmsgs) != len(msgs) { - return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) - } - for i := 0; i < len(respmsgs); i++ { - op.resp <- &respmsgs[i] - } + op.resp <- respmsgs return nil } @@ -178,7 +206,7 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadClos if err != nil { return nil, err } - req, err := http.NewRequestWithContext(ctx, "POST", hc.url, io.NopCloser(bytes.NewReader(body))) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body))) if err != nil { return nil, err } @@ -189,6 +217,13 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadClos hc.mu.Lock() req.Header = hc.headers.Clone() hc.mu.Unlock() + setHeaders(req.Header, headersFromContext(ctx)) + + if hc.auth != nil { + if err := hc.auth(req.Header); err != nil { + return nil, err + } + } // do request resp, err := hc.client.Do(req) @@ -221,7 +256,42 @@ type httpServerConn struct { func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { body := io.LimitReader(r.Body, maxRequestContentLength) conn := &httpServerConn{Reader: body, Writer: w, r: r} - return NewCodec(conn) + + encoder := func(v any, isErrorResponse bool) error { + if !isErrorResponse { + return json.NewEncoder(conn).Encode(v) + } + + // It's an error response and requires special treatment. + // + // In case of a timeout error, the response must be written before the HTTP + // server's write timeout occurs. So we need to flush the response. The + // Content-Length header also needs to be set to ensure the client knows + // when it has the full response. + encdata, err := json.Marshal(v) + if err != nil { + return err + } + w.Header().Set("content-length", strconv.Itoa(len(encdata))) + + // If this request is wrapped in a handler that might remove Content-Length (such + // as the automatic gzip we do in package node), we need to ensure the HTTP server + // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked + // encoding might not be finished correctly, and some clients do not like it when + // the final chunk is missing. + w.Header().Set("transfer-encoding", "identity") + + _, err = w.Write(encdata) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return err + } + + dec := json.NewDecoder(conn) + dec.UseNumber() + + return NewFuncCodec(conn, encoder, dec.Decode) } // Close does nothing and always returns nil. @@ -259,19 +329,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // All checks passed, create a codec that reads directly from the request body // until EOF, writes the response to w, and orders the server to process a // single request. - ctx = context.WithValue(ctx, "remote", r.RemoteAddr) - ctx = context.WithValue(ctx, "scheme", r.Proto) - ctx = context.WithValue(ctx, "local", r.Host) - if ua := r.Header.Get("User-Agent"); ua != "" { - ctx = context.WithValue(ctx, "User-Agent", ua) - } - if origin := r.Header.Get("Origin"); origin != "" { - ctx = context.WithValue(ctx, "Origin", origin) - } - if xForward := r.Header.Get("X-Forwarded-For"); xForward != "" { - ctx = context.WithValue(ctx, "X-Forwarded-For", xForward) - } - w.Header().Set("content-type", contentType) codec := newHTTPServerConn(r, w) defer codec.close() @@ -304,3 +361,35 @@ func validateRequest(r *http.Request) (int, error) { err := fmt.Errorf("invalid content type, only %s is supported", contentType) return http.StatusUnsupportedMediaType, err } + +// ContextRequestTimeout returns the request timeout derived from the given context. +func ContextRequestTimeout(ctx context.Context) (time.Duration, bool) { + timeout := time.Duration(math.MaxInt64) + hasTimeout := false + setTimeout := func(d time.Duration) { + if d < timeout { + timeout = d + hasTimeout = true + } + } + + if deadline, ok := ctx.Deadline(); ok { + setTimeout(time.Until(deadline)) + } + + // If the context is an HTTP request context, use the server's WriteTimeout. + httpSrv, ok := ctx.Value(http.ServerContextKey).(*http.Server) + if ok && httpSrv.WriteTimeout > 0 { + wt := httpSrv.WriteTimeout + // When a write timeout is configured, we need to send the response message before + // the HTTP server cuts connection. So our internal timeout must be earlier than + // the server's true timeout. + // + // Note: Timeouts are sanitized to be a minimum of 1 second. + // Also see issue: https://github.com/golang/go/issues/47229 + wt -= 100 * time.Millisecond + setTimeout(wt) + } + + return timeout, hasTimeout +} diff --git a/rpc/http_test.go b/rpc/http_test.go index c84d7705f2..584842a9aa 100644 --- a/rpc/http_test.go +++ b/rpc/http_test.go @@ -17,6 +17,8 @@ package rpc import ( + "context" + "fmt" "net/http" "net/http/httptest" "strings" @@ -92,6 +94,7 @@ func confirmHTTPRequestYieldsStatusCode(t *testing.T, method, contentType, body if err != nil { t.Fatalf("request failed: %v", err) } + resp.Body.Close() confirmStatusCode(t, resp.StatusCode, expectedStatusCode) } @@ -198,3 +201,43 @@ func TestHTTPPeerInfo(t *testing.T) { t.Errorf("wrong HTTP.Origin %q", info.HTTP.UserAgent) } } + +func TestNewContextWithHeaders(t *testing.T) { + expectedHeaders := 0 + server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + for i := 0; i < expectedHeaders; i++ { + key, want := fmt.Sprintf("key-%d", i), fmt.Sprintf("val-%d", i) + if have := request.Header.Get(key); have != want { + t.Errorf("wrong request headers for %s, want: %s, have: %s", key, want, have) + } + } + writer.WriteHeader(http.StatusOK) + _, _ = writer.Write([]byte(`{}`)) + })) + defer server.Close() + + client, err := Dial(server.URL) + if err != nil { + t.Fatalf("failed to dial: %s", err) + } + defer client.Close() + + newHdr := func(k, v string) http.Header { + header := http.Header{} + header.Set(k, v) + return header + } + ctx1 := NewContextWithHeaders(context.Background(), newHdr("key-0", "val-0")) + ctx2 := NewContextWithHeaders(ctx1, newHdr("key-1", "val-1")) + ctx3 := NewContextWithHeaders(ctx2, newHdr("key-2", "val-2")) + + expectedHeaders = 3 + if err := client.CallContext(ctx3, nil, "test"); err != ErrNoResult { + t.Error("call failed", err) + } + + expectedHeaders = 2 + if err := client.CallContext(ctx2, nil, "test"); err != ErrNoResult { + t.Error("call failed:", err) + } +} diff --git a/rpc/inproc.go b/rpc/inproc.go index fbe9a40cec..306974e04b 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -24,7 +24,8 @@ import ( // DialInProc attaches an in-process connection to the given RPC server. func DialInProc(handler *Server) *Client { initctx := context.Background() - c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + c, _ := newClient(initctx, cfg, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() go handler.ServeCodec(NewCodec(p1), 0) return NewCodec(p2), nil diff --git a/rpc/ipc.go b/rpc/ipc.go index 07a211c627..a08245b270 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -46,11 +46,16 @@ func (s *Server) ServeListener(l net.Listener) error { // The context is used for the initial connection establishment. It does not // affect subsequent interactions with the client. func DialIPC(ctx context.Context, endpoint string) (*Client, error) { - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + return newClient(ctx, cfg, newClientTransportIPC(endpoint)) +} + +func newClientTransportIPC(endpoint string) reconnectFunc { + return func(ctx context.Context) (ServerCodec, error) { conn, err := newIPCConnection(ctx, endpoint) if err != nil { return nil, err } return NewCodec(conn), err - }) + } } diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go index 249a9cf044..33c1cad549 100644 --- a/rpc/ipc_unix.go +++ b/rpc/ipc_unix.go @@ -29,10 +29,17 @@ import ( "github.com/ethereum/go-ethereum/log" ) +const ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + maxPathSize = int(108) +) + // ipcListen will create a Unix socket on the given endpoint. func ipcListen(endpoint string) (net.Listener, error) { - if len(endpoint) > int(max_path_size) { - log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", max_path_size), + // account for null-terminator too + if len(endpoint)+1 > maxPathSize { + log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize-1), "endpoint", endpoint) } diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index adb1826f0c..efec38cf37 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -24,7 +24,7 @@ import ( "net" "time" - "gopkg.in/natefinch/npipe.v2" + "github.com/Microsoft/go-winio" ) // This is used if the dialing context has no deadline. It is much smaller than the @@ -33,17 +33,12 @@ const defaultPipeDialTimeout = 2 * time.Second // ipcListen will create a named pipe on the given endpoint. func ipcListen(endpoint string) (net.Listener, error) { - return npipe.Listen(endpoint) + return winio.ListenPipe(endpoint, nil) } // newIPCConnection will connect to a named pipe with the given endpoint as name. func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { - timeout := defaultPipeDialTimeout - if deadline, ok := ctx.Deadline(); ok { - timeout = deadline.Sub(time.Now()) - if timeout < 0 { - timeout = 0 - } - } - return npipe.DialTimeout(endpoint, timeout) + ctx, cancel := context.WithTimeout(ctx, defaultPipeDialTimeout) + defer cancel() + return winio.DialPipeContext(ctx, endpoint) } diff --git a/rpc/json.go b/rpc/json.go index 6024f1e7dc..8a3b162cab 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -58,21 +58,25 @@ type jsonrpcMessage struct { } func (msg *jsonrpcMessage) isNotification() bool { - return msg.ID == nil && msg.Method != "" + return msg.hasValidVersion() && msg.ID == nil && msg.Method != "" } func (msg *jsonrpcMessage) isCall() bool { - return msg.hasValidID() && msg.Method != "" + return msg.hasValidVersion() && msg.hasValidID() && msg.Method != "" } func (msg *jsonrpcMessage) isResponse() bool { - return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) + return msg.hasValidVersion() && msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) } func (msg *jsonrpcMessage) hasValidID() bool { return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' } +func (msg *jsonrpcMessage) hasValidVersion() bool { + return msg.Version == vsn +} + func (msg *jsonrpcMessage) isSubscribe() bool { return strings.HasSuffix(msg.Method, subscribeMethodSuffix) } @@ -100,15 +104,14 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { enc, err := json.Marshal(result) if err != nil { - // TODO: wrap with 'internal server error' - return msg.errorResponse(err) + return msg.errorResponse(&internalServerError{errcodeMarshalError, err.Error()}) } return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc} } func errorMessage(err error) *jsonrpcMessage { msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ - Code: defaultErrorCode, + Code: errcodeDefault, Message: err.Error(), }} ec, ok := err.(Error) @@ -165,18 +168,22 @@ type ConnRemoteAddr interface { // support for parsing arguments and serializing (result) objects. type jsonCodec struct { remote string - closer sync.Once // close closed channel once - closeCh chan interface{} // closed on Close - decode func(v interface{}) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v interface{}) error // encoder to allow multiple transports + closer sync.Once // close closed channel once + closeCh chan interface{} // closed on Close + decode decodeFunc // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encode encodeFunc // encoder to allow multiple transports conn deadlineCloser } +type encodeFunc = func(v interface{}, isErrorResponse bool) error + +type decodeFunc = func(v interface{}) error + // NewFuncCodec creates a codec which uses the given functions to read and write. If conn // implements ConnRemoteAddr, log messages will use it to include the remote address of // the connection. -func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec { +func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec { codec := &jsonCodec{ closeCh: make(chan interface{}), encode: encode, @@ -195,7 +202,11 @@ func NewCodec(conn Conn) ServerCodec { enc := json.NewEncoder(conn) dec := json.NewDecoder(conn) dec.UseNumber() - return NewFuncCodec(conn, enc.Encode, dec.Decode) + + encode := func(v interface{}, isErrorResponse bool) error { + return enc.Encode(v) + } + return NewFuncCodec(conn, encode, dec.Decode) } func (c *jsonCodec) peerInfo() PeerInfo { @@ -225,7 +236,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err return messages, batch, nil } -func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { +func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error { c.encMu.Lock() defer c.encMu.Unlock() @@ -234,7 +245,7 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { deadline = time.Now().Add(defaultWriteTimeout) } c.conn.SetWriteDeadline(deadline) - return c.encode(v) + return c.encode(v, isErrorResponse) } func (c *jsonCodec) close() { diff --git a/rpc/metrics.go b/rpc/metrics.go index ecd2e0e00f..b1f1284535 100644 --- a/rpc/metrics.go +++ b/rpc/metrics.go @@ -18,6 +18,7 @@ package rpc import ( "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" ) @@ -25,20 +26,25 @@ import ( var ( rpcRequestGauge = metrics.NewRegisteredGauge("rpc/requests", nil) successfulRequestGauge = metrics.NewRegisteredGauge("rpc/success", nil) - failedReqeustGauge = metrics.NewRegisteredGauge("rpc/failure", nil) - RpcServingTimer = metrics.NewRegisteredTimer("rpc/duration/all", nil) + failedRequestGauge = metrics.NewRegisteredGauge("rpc/failure", nil) + + // serveTimeHistName is the prefix of the per-request serving time histograms. + serveTimeHistName = "rpc/duration" + + rpcServingTimer = metrics.NewRegisteredTimer("rpc/duration/all", nil) ) -func newRPCServingTimer(method string, valid bool) metrics.Timer { - flag := "success" - if !valid { - flag = "failure" +// updateServeTimeHistogram tracks the serving time of a remote RPC call. +func updateServeTimeHistogram(method string, success bool, elapsed time.Duration) { + note := "success" + if !success { + note = "failure" } - m := fmt.Sprintf("rpc/duration/%s/%s", method, flag) - return metrics.GetOrRegisterTimer(m, nil) -} - -func newRPCRequestGauge(method string) metrics.Gauge { - m := fmt.Sprintf("rpc/count/%s", method) - return metrics.GetOrRegisterGauge(m, nil) + h := fmt.Sprintf("%s/%s/%s", serveTimeHistName, method, note) + sampler := func() metrics.Sample { + return metrics.ResettingSample( + metrics.NewExpDecaySample(1028, 0.015), + ) + } + metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(elapsed.Microseconds()) } diff --git a/rpc/server.go b/rpc/server.go index 9010c4adb9..2742adf07b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -19,13 +19,14 @@ package rpc import ( "context" "io" + "sync" "sync/atomic" - mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/log" ) const MetadataApi = "rpc" +const EngineApi = "engine" // CodecOption specifies which type of messages a codec supports. // @@ -44,13 +45,21 @@ const ( type Server struct { services serviceRegistry idgen func() ID - run int32 - codecs mapset.Set + + mutex sync.Mutex + codecs map[ServerCodec]struct{} + run atomic.Bool + batchItemLimit int + batchResponseLimit int } // NewServer creates a new server instance with no registered handlers. func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} + server := &Server{ + idgen: randomIDGenerator(), + codecs: make(map[ServerCodec]struct{}), + } + server.run.Store(true) // Register the default service providing meta information about the RPC service such // as the services and methods it offers. rpcService := &RPCService{server} @@ -58,6 +67,17 @@ func NewServer() *Server { return server } +// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit' +// is the maximum number of items in a batch. 'maxResponseSize' is the maximum number of +// response bytes across all requests in a batch. +// +// This method should be called before processing any requests via ServeCodec, ServeHTTP, +// ServeListener etc. +func (s *Server) SetBatchLimits(itemLimit, maxResponseSize int) { + s.batchItemLimit = itemLimit + s.batchResponseLimit = maxResponseSize +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the @@ -74,44 +94,64 @@ func (s *Server) RegisterName(name string, receiver interface{}) error { func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { defer codec.close() - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { + if !s.trackCodec(codec) { return } + defer s.untrackCodec(codec) - // Add the codec to the set so it can be closed by Stop. - s.codecs.Add(codec) - defer s.codecs.Remove(codec) - - c := initClient(codec, s.idgen, &s.services) + cfg := &clientConfig{ + idgen: s.idgen, + batchItemLimit: s.batchItemLimit, + batchResponseLimit: s.batchResponseLimit, + } + c := initClient(codec, &s.services, cfg) <-codec.closed() c.Close() } +func (s *Server) trackCodec(codec ServerCodec) bool { + s.mutex.Lock() + defer s.mutex.Unlock() + + if !s.run.Load() { + return false // Don't serve if server is stopped. + } + s.codecs[codec] = struct{}{} + return true +} + +func (s *Server) untrackCodec(codec ServerCodec) { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.codecs, codec) +} + // serveSingleRequest reads and processes a single RPC request from the given codec. This // is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in // this mode. func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { + if !s.run.Load() { return } - h := newHandler(ctx, codec, s.idgen, &s.services) + h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit, s.batchResponseLimit) h.allowSubscribe = false defer h.close(io.EOF, nil) reqs, batch, err := codec.readBatch() if err != nil { if err != io.EOF { - codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) + resp := errorMessage(&invalidMessageError{"parse error"}) + codec.writeJSON(ctx, resp, true) } return } if batch { - h.handleBatch(ctx, reqs) + h.handleBatch(reqs) } else { - h.handleMsg(ctx, reqs[0]) + h.handleMsg(reqs[0]) } } @@ -119,12 +159,14 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { // requests to finish, then closes all codecs which will cancel pending requests and // subscriptions. func (s *Server) Stop() { - if atomic.CompareAndSwapInt32(&s.run, 1, 0) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.run.CompareAndSwap(true, false) { log.Debug("RPC server shutting down") - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).close() - return true - }) + for codec := range s.codecs { + codec.close() + } } } @@ -159,7 +201,7 @@ type PeerInfo struct { // Address of client. This will usually contain the IP address and port. RemoteAddr string - // Addditional information for HTTP and WebSocket connections. + // Additional information for HTTP and WebSocket connections. HTTP struct { // Protocol version, i.e. "HTTP/1.1". This is not set for WebSocket. Version string diff --git a/rpc/server_test.go b/rpc/server_test.go index d09d31634b..9d1c7fb5f0 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -32,7 +32,8 @@ func TestServerRegisterName(t *testing.T) { server := NewServer() service := new(testService) - if err := server.RegisterName("test", service); err != nil { + svcName := "test" + if err := server.RegisterName(svcName, service); err != nil { t.Fatalf("%v", err) } @@ -40,12 +41,12 @@ func TestServerRegisterName(t *testing.T) { t.Fatalf("Expected 2 service entries, got %d", len(server.services.services)) } - svc, ok := server.services.services["test"] + svc, ok := server.services.services[svcName] if !ok { - t.Fatalf("Expected service calc to be registered") + t.Fatalf("Expected service %s to be registered", svcName) } - wantCallbacks := 10 + wantCallbacks := 14 if len(svc.callbacks) != wantCallbacks { t.Errorf("Expected %d callbacks for service 'service', got %d", wantCallbacks, len(svc.callbacks)) } @@ -70,6 +71,7 @@ func TestServer(t *testing.T) { func runTestScript(t *testing.T, file string) { server := newTestServer() + server.SetBatchLimits(4, 100000) content, err := os.ReadFile(file) if err != nil { t.Fatal(err) @@ -152,3 +154,41 @@ func TestServerShortLivedConn(t *testing.T) { } } } + +func TestServerBatchResponseSizeLimit(t *testing.T) { + server := newTestServer() + defer server.Stop() + server.SetBatchLimits(100, 60) + var ( + batch []BatchElem + client = DialInProc(server) + ) + for i := 0; i < 5; i++ { + batch = append(batch, BatchElem{ + Method: "test_echo", + Args: []any{"x", 1}, + Result: new(echoResult), + }) + } + if err := client.BatchCall(batch); err != nil { + t.Fatal("error sending batch:", err) + } + for i := range batch { + // We expect the first two queries to be ok, but after that the size limit takes effect. + if i < 2 { + if batch[i].Error != nil { + t.Fatalf("batch elem %d has unexpected error: %v", i, batch[i].Error) + } + continue + } + // After two, we expect an error. + re, ok := batch[i].Error.(Error) + if !ok { + t.Fatalf("batch elem %d has wrong error: %v", i, batch[i].Error) + } + wantedCode := errcodeResponseTooLarge + if re.ErrorCode() != wantedCode { + t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), wantedCode) + } + } +} diff --git a/rpc/service.go b/rpc/service.go index bef891ea11..8485cab3aa 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -18,7 +18,6 @@ package rpc import ( "context" - "errors" "fmt" "reflect" "runtime" @@ -199,7 +198,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) - errRes = errors.New("method handler crashed") + errRes = &internalServerError{errcodePanic, "method handler crashed"} } }() // Run the callback. @@ -215,19 +214,8 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value return results[0].Interface(), nil } -// Is t context.Context or *context.Context? -func isContextType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t == contextType -} - // Does t satisfy the error interface? func isErrorType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } return t.Implements(errorType) } @@ -246,7 +234,7 @@ func isPubSub(methodType reflect.Type) bool { if methodType.NumIn() < 2 || methodType.NumOut() != 2 { return false } - return isContextType(methodType.In(1)) && + return methodType.In(1) == contextType && isSubscriptionType(methodType.Out(0)) && isErrorType(methodType.Out(1)) } diff --git a/rpc/stdio.go b/rpc/stdio.go index be2bab1c98..084e5f0700 100644 --- a/rpc/stdio.go +++ b/rpc/stdio.go @@ -32,12 +32,17 @@ func DialStdIO(ctx context.Context) (*Client, error) { // DialIO creates a client which uses the given IO channels func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { - return newClient(ctx, func(_ context.Context) (ServerCodec, error) { + cfg := new(clientConfig) + return newClient(ctx, cfg, newClientTransportIO(in, out)) +} + +func newClientTransportIO(in io.Reader, out io.Writer) reconnectFunc { + return func(context.Context) (ServerCodec, error) { return NewCodec(stdioConn{ in: in, out: out, }), nil - }) + } } type stdioConn struct { diff --git a/rpc/subscription.go b/rpc/subscription.go index 942e764e5d..3231c2ceec 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -32,9 +32,18 @@ import ( ) var ( - // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("notifications not supported") - // ErrNotificationNotFound is returned when the notification for the given id is not found + // ErrNotificationsUnsupported is returned by the client when the connection doesn't + // support notifications. You can use this error value to check for subscription + // support like this: + // + // sub, err := client.EthSubscribe(ctx, channel, "newHeads", true) + // if errors.Is(err, rpc.ErrNotificationsUnsupported) { + // // Server does not support subscriptions, fall back to polling. + // } + // + ErrNotificationsUnsupported = notificationsUnsupportedError{} + + // ErrSubscriptionNotFound is returned when the notification for the given id is not found ErrSubscriptionNotFound = errors.New("subscription not found") ) @@ -175,11 +184,13 @@ func (n *Notifier) activate() error { func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) ctx := context.Background() - return n.h.conn.writeJSON(ctx, &jsonrpcMessage{ + + msg := &jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, Params: params, - }) + } + return n.h.conn.writeJSON(ctx, msg, false) } // A Subscription is created by a notifier and tied to that notifier. The client can use diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index a920205c00..b270457829 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -79,7 +79,7 @@ func TestSubscriptions(t *testing.T) { request := map[string]interface{}{ "id": i, "method": fmt.Sprintf("%s_subscribe", namespace), - "version": "2.0", + "jsonrpc": "2.0", "params": []interface{}{"someSubscription", notificationCount, i}, } if err := out.Encode(&request); err != nil { diff --git a/rpc/testdata/internal-error.js b/rpc/testdata/internal-error.js new file mode 100644 index 0000000000..2ba387401f --- /dev/null +++ b/rpc/testdata/internal-error.js @@ -0,0 +1,7 @@ +// These tests trigger various 'internal error' conditions. + +--> {"jsonrpc":"2.0","id":1,"method":"test_marshalError","params": []} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32603,"message":"json: error calling MarshalText for type *rpc.MarshalErrObj: marshal error"}} + +--> {"jsonrpc":"2.0","id":2,"method":"test_panic","params": []} +<-- {"jsonrpc":"2.0","id":2,"error":{"code":-32603,"message":"method handler crashed"}} diff --git a/rpc/testdata/invalid-badversion.js b/rpc/testdata/invalid-badversion.js new file mode 100644 index 0000000000..75b5291dc3 --- /dev/null +++ b/rpc/testdata/invalid-badversion.js @@ -0,0 +1,19 @@ +// This test checks processing of messages with invalid Version. + +--> {"jsonrpc":"2.0","id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"result":{"String":"x","Int":3,"Args":null}} + +--> {"jsonrpc":"2.1","id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"invalid request"}} + +--> {"jsonrpc":"go-ethereum","id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"invalid request"}} + +--> {"jsonrpc":1,"id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"invalid request"}} + +--> {"jsonrpc":2.0,"id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"invalid request"}} + +--> {"id":1,"method":"test_echo","params":["x", 3]} +<-- {"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"invalid request"}} diff --git a/rpc/testdata/invalid-batch-toolarge.js b/rpc/testdata/invalid-batch-toolarge.js new file mode 100644 index 0000000000..218fea58aa --- /dev/null +++ b/rpc/testdata/invalid-batch-toolarge.js @@ -0,0 +1,13 @@ +// This file checks the behavior of the batch item limit code. +// In tests, the batch item limit is set to 4. So to trigger the error, +// all batches in this file have 5 elements. + +// For batches that do not contain any calls, a response message with "id" == null +// is returned. + +--> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] +<-- [{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"batch too large"}}] + +// For batches with at least one call, the call's "id" is used. +--> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","id":3,"method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] +<-- [{"jsonrpc":"2.0","id":3,"error":{"code":-32600,"message":"batch too large"}}] diff --git a/rpc/testservice_test.go b/rpc/testservice_test.go index 253e263289..7d873af667 100644 --- a/rpc/testservice_test.go +++ b/rpc/testservice_test.go @@ -70,8 +70,18 @@ func (testError) Error() string { return "testError" } func (testError) ErrorCode() int { return 444 } func (testError) ErrorData() interface{} { return "testError data" } +type MarshalErrObj struct{} + +func (o *MarshalErrObj) MarshalText() ([]byte, error) { + return nil, errors.New("marshal error") +} + func (s *testService) NoArgsRets() {} +func (s *testService) Null() any { + return nil +} + func (s *testService) Echo(str string, i int, args *echoArgs) echoResult { return echoResult{str, i, args} } @@ -80,6 +90,10 @@ func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args * return echoResult{str, i, args} } +func (s *testService) Repeat(msg string, i int) string { + return strings.Repeat(msg, i) +} + func (s *testService) PeerInfo(ctx context.Context) PeerInfo { return PeerInfoFromContext(ctx) } @@ -114,6 +128,14 @@ func (s *testService) ReturnError() error { return testError{} } +func (s *testService) MarshalError() *MarshalErrObj { + return &MarshalErrObj{} +} + +func (s *testService) Panic() string { + panic("service panic") +} + func (s *testService) CallMeBack(ctx context.Context, method string, args []interface{}) (interface{}, error) { c, ok := ClientFromContext(ctx) if !ok { diff --git a/rpc/types.go b/rpc/types.go index 959e383723..6eeab73381 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "math" - "strconv" "strings" "github.com/ethereum/go-ethereum/common" @@ -30,10 +29,11 @@ import ( // API describes the set of methods offered over the RPC interface type API struct { - Namespace string // namespace under which the rpc methods of Service are exposed - Version string // api version for DApp's - Service interface{} // receiver instance which holds the methods - Public bool // indication if the methods must be considered safe for public use + Namespace string // namespace under which the rpc methods of Service are exposed + Version string // deprecated - this field is no longer used, but retained for compatibility + Service interface{} // receiver instance which holds the methods + Public bool // deprecated - this field is no longer used, but retained for compatibility + Authenticated bool // whether the api should only be available behind authentication. } // ServerCodec implements reading, parsing and writing RPC messages for the server side of @@ -50,7 +50,9 @@ type ServerCodec interface { // jsonWriter can write JSON messages to its underlying connection. // Implementations must be safe for concurrent use. type jsonWriter interface { - writeJSON(context.Context, interface{}) error + // writeJSON writes a message to the connection. + writeJSON(ctx context.Context, msg interface{}, isError bool) error + // Closed returns a channel which is closed when the connection is closed. closed() <-chan interface{} // RemoteAddr returns the peer address of the connection. @@ -60,13 +62,13 @@ type jsonWriter interface { type BlockNumber int64 const ( - PendingBlockNumber = BlockNumber(-2) - LatestBlockNumber = BlockNumber(-1) + LatestBlockNumber = BlockNumber(-2) + PendingBlockNumber = BlockNumber(-1) EarliestBlockNumber = BlockNumber(0) ) // UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports: -// - "latest", "earliest" or "pending" as string arguments +// - "safe", "finalized", "latest", "earliest" or "pending" as string arguments // - the block number // Returned errors: // - an invalid block number error when the given argument isn't a known strings @@ -100,26 +102,34 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { return nil } +// Int64 returns the block number as int64. +func (bn BlockNumber) Int64() int64 { + return (int64)(bn) +} + // MarshalText implements encoding.TextMarshaler. It marshals: -// - "latest", "earliest" or "pending" as strings +// - "safe", "finalized", "latest", "earliest" or "pending" as strings // - other numbers as hex func (bn BlockNumber) MarshalText() ([]byte, error) { + return []byte(bn.String()), nil +} + +func (bn BlockNumber) String() string { switch bn { case EarliestBlockNumber: - return []byte("earliest"), nil + return "earliest" case LatestBlockNumber: - return []byte("latest"), nil + return "latest" case PendingBlockNumber: - return []byte("pending"), nil + return "pending" default: - return hexutil.Uint64(bn).MarshalText() + if bn < 0 { + return fmt.Sprintf("", bn) + } + return hexutil.Uint64(bn).String() } } -func (bn BlockNumber) Int64() int64 { - return (int64)(bn) -} - type BlockNumberOrHash struct { BlockNumber *BlockNumber `json:"blockNumber,omitempty"` BlockHash *common.Hash `json:"blockHash,omitempty"` @@ -190,7 +200,7 @@ func (bnh *BlockNumberOrHash) Number() (BlockNumber, bool) { func (bnh *BlockNumberOrHash) String() string { if bnh.BlockNumber != nil { - return strconv.Itoa(int(*bnh.BlockNumber)) + return bnh.BlockNumber.String() } if bnh.BlockHash != nil { return bnh.BlockHash.String() @@ -220,24 +230,3 @@ func BlockNumberOrHashWithHash(hash common.Hash, canonical bool) BlockNumberOrHa RequireCanonical: canonical, } } - -// DecimalOrHex unmarshals a non-negative decimal or hex parameter into a uint64. -type DecimalOrHex uint64 - -// UnmarshalJSON implements json.Unmarshaler. -func (dh *DecimalOrHex) UnmarshalJSON(data []byte) error { - input := strings.TrimSpace(string(data)) - if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { - input = input[1 : len(input)-1] - } - - value, err := strconv.ParseUint(input, 10, 64) - if err != nil { - value, err = hexutil.DecodeUint64(input) - } - if err != nil { - return err - } - *dh = DecimalOrHex(value) - return nil -} diff --git a/rpc/types_test.go b/rpc/types_test.go index f110dee7c6..617f441d91 100644 --- a/rpc/types_test.go +++ b/rpc/types_test.go @@ -153,3 +153,24 @@ func TestBlockNumberOrHash_WithNumber_MarshalAndUnmarshal(t *testing.T) { }) } } + +func TestBlockNumberOrHash_StringAndUnmarshal(t *testing.T) { + tests := []BlockNumberOrHash{ + BlockNumberOrHashWithNumber(math.MaxInt64), + BlockNumberOrHashWithNumber(PendingBlockNumber), + BlockNumberOrHashWithNumber(LatestBlockNumber), + BlockNumberOrHashWithNumber(EarliestBlockNumber), + BlockNumberOrHashWithNumber(32), + BlockNumberOrHashWithHash(common.Hash{0xaa}, false), + } + for _, want := range tests { + marshalled, _ := json.Marshal(want.String()) + var have BlockNumberOrHash + if err := json.Unmarshal(marshalled, &have); err != nil { + t.Fatalf("cannot unmarshal (%v): %v", string(marshalled), err) + } + if !reflect.DeepEqual(want, have) { + t.Fatalf("wrong result: have %v, want %v", have, want) + } + } +} diff --git a/rpc/websocket.go b/rpc/websocket.go index 28380d8aa4..538e53a31b 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -27,7 +27,7 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/log" "github.com/gorilla/websocket" ) @@ -35,10 +35,10 @@ import ( const ( wsReadBuffer = 1024 wsWriteBuffer = 1024 - wsPingInterval = 60 * time.Second + wsPingInterval = 30 * time.Second wsPingWriteTimeout = 5 * time.Second wsPongTimeout = 30 * time.Second - wsMessageSizeLimit = 15 * 1024 * 1024 + wsDefaultReadLimit = 32 * 1024 * 1024 ) var wsBufferPool = new(sync.Pool) @@ -60,7 +60,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { log.Debug("WebSocket upgrade failed", "err", err) return } - codec := newWebsocketCodec(conn, r.Host, r.Header) + codec := newWebsocketCodec(conn, r.Host, r.Header, wsDefaultReadLimit) s.ServeCodec(codec, 0) }) } @@ -69,7 +69,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. func wsHandshakeValidator(allowedOrigins []string) func(*http.Request) bool { - origins := mapset.NewSet() + origins := mapset.NewSet[string]() allowAllOrigins := false for _, origin := range allowedOrigins { @@ -122,10 +122,10 @@ func (e wsHandshakeError) Error() string { return s } -func originIsAllowed(allowedOrigins mapset.Set, browserOrigin string) bool { +func originIsAllowed(allowedOrigins mapset.Set[string], browserOrigin string) bool { it := allowedOrigins.Iterator() for origin := range it.C { - if ruleAllowsOrigin(origin.(string), browserOrigin) { + if ruleAllowsOrigin(origin, browserOrigin) { return true } } @@ -181,24 +181,23 @@ func parseOriginURL(origin string) (string, string, string, error) { return scheme, hostname, port, nil } -// DialWebsocketWithDialer creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint using the provided dialer. +// DialWebsocketWithDialer creates a new RPC client using WebSocket. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +// +// Deprecated: use DialOptions and the WithWebsocketDialer option. func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, dialer websocket.Dialer) (*Client, error) { - endpoint, header, err := wsClientHeaders(endpoint, origin) + cfg := new(clientConfig) + cfg.wsDialer = &dialer + if origin != "" { + cfg.setHeader("origin", origin) + } + connect, err := newClientTransportWS(endpoint, cfg) if err != nil { return nil, err } - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { - conn, resp, err := dialer.DialContext(ctx, endpoint, header) - if err != nil { - hErr := wsHandshakeError{err: err} - if resp != nil { - hErr.status = resp.Status - } - return nil, hErr - } - return newWebsocketCodec(conn, endpoint, header), nil - }) + return newClient(ctx, cfg, connect) } // DialWebsocket creates a new RPC client that communicates with a JSON-RPC server @@ -207,12 +206,58 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale // The context is used for the initial connection establishment. It does not // affect subsequent interactions with the client. func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { - dialer := websocket.Dialer{ - ReadBufferSize: wsReadBuffer, - WriteBufferSize: wsWriteBuffer, - WriteBufferPool: wsBufferPool, + cfg := new(clientConfig) + if origin != "" { + cfg.setHeader("origin", origin) + } + connect, err := newClientTransportWS(endpoint, cfg) + if err != nil { + return nil, err } - return DialWebsocketWithDialer(ctx, endpoint, origin, dialer) + return newClient(ctx, cfg, connect) +} + +func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, error) { + dialer := cfg.wsDialer + if dialer == nil { + dialer = &websocket.Dialer{ + ReadBufferSize: wsReadBuffer, + WriteBufferSize: wsWriteBuffer, + WriteBufferPool: wsBufferPool, + Proxy: http.ProxyFromEnvironment, + } + } + + dialURL, header, err := wsClientHeaders(endpoint, "") + if err != nil { + return nil, err + } + for key, values := range cfg.httpHeaders { + header[key] = values + } + + connect := func(ctx context.Context) (ServerCodec, error) { + header := header.Clone() + if cfg.httpAuth != nil { + if err := cfg.httpAuth(header); err != nil { + return nil, err + } + } + conn, resp, err := dialer.DialContext(ctx, dialURL, header) + if err != nil { + hErr := wsHandshakeError{err: err} + if resp != nil { + hErr.status = resp.Status + } + return nil, hErr + } + messageSizeLimit := int64(wsDefaultReadLimit) + if cfg.wsMessageSizeLimit != nil && *cfg.wsMessageSizeLimit >= 0 { + messageSizeLimit = *cfg.wsMessageSizeLimit + } + return newWebsocketCodec(conn, dialURL, header, messageSizeLimit), nil + } + return connect, nil } func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { @@ -237,20 +282,21 @@ type websocketCodec struct { conn *websocket.Conn info PeerInfo - wg sync.WaitGroup - pingReset chan struct{} + wg sync.WaitGroup + pingReset chan struct{} + pongReceived chan struct{} } -func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) ServerCodec { - conn.SetReadLimit(wsMessageSizeLimit) - conn.SetPongHandler(func(appData string) error { - conn.SetReadDeadline(time.Time{}) - return nil - }) +func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { + conn.SetReadLimit(readLimit) + encode := func(v interface{}, isErrorResponse bool) error { + return conn.WriteJSON(v) + } wc := &websocketCodec{ - jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), - conn: conn, - pingReset: make(chan struct{}, 1), + jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec), + conn: conn, + pingReset: make(chan struct{}, 1), + pongReceived: make(chan struct{}), info: PeerInfo{ Transport: "ws", RemoteAddr: conn.RemoteAddr().String(), @@ -261,6 +307,13 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) Serve wc.info.HTTP.Origin = req.Get("Origin") wc.info.HTTP.UserAgent = req.Get("User-Agent") // Start pinger. + conn.SetPongHandler(func(appData string) error { + select { + case wc.pongReceived <- struct{}{}: + case <-wc.closed(): + } + return nil + }) wc.wg.Add(1) go wc.pingLoop() return wc @@ -275,8 +328,8 @@ func (wc *websocketCodec) peerInfo() PeerInfo { return wc.info } -func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { - err := wc.jsonCodec.writeJSON(ctx, v) +func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error { + err := wc.jsonCodec.writeJSON(ctx, v, isError) if err == nil { // Notify pingLoop to delay the next idle ping. select { @@ -289,26 +342,31 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { // pingLoop sends periodic ping frames when the connection is idle. func (wc *websocketCodec) pingLoop() { - var timer = time.NewTimer(wsPingInterval) + var pingTimer = time.NewTimer(wsPingInterval) defer wc.wg.Done() - defer timer.Stop() + defer pingTimer.Stop() for { select { case <-wc.closed(): return + case <-wc.pingReset: - if !timer.Stop() { - <-timer.C + if !pingTimer.Stop() { + <-pingTimer.C } - timer.Reset(wsPingInterval) - case <-timer.C: + pingTimer.Reset(wsPingInterval) + + case <-pingTimer.C: wc.jsonCodec.encMu.Lock() wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout)) wc.conn.WriteMessage(websocket.PingMessage, nil) wc.conn.SetReadDeadline(time.Now().Add(wsPongTimeout)) wc.jsonCodec.encMu.Unlock() - timer.Reset(wsPingInterval) + pingTimer.Reset(wsPingInterval) + + case <-wc.pongReceived: + wc.conn.SetReadDeadline(time.Time{}) } } } diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index 8659f798e4..d3e15d94c9 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -19,14 +19,10 @@ package rpc import ( "context" "errors" - "io" "net" "net/http" "net/http/httptest" - "net/http/httputil" - "net/url" "strings" - "sync/atomic" "testing" "time" @@ -76,7 +72,7 @@ func TestWebsocketOriginCheck(t *testing.T) { // Connections without origin header should work. client, err = DialWebsocket(context.Background(), wsURL, "") if err != nil { - t.Fatal("error for empty origin") + t.Fatalf("error for empty origin: %v", err) } client.Close() } @@ -117,6 +113,66 @@ func TestWebsocketLargeCall(t *testing.T) { } } +// This test checks whether the wsMessageSizeLimit option is obeyed. +func TestWebsocketLargeRead(t *testing.T) { + t.Parallel() + + var ( + srv = newTestServer() + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"})) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + testLimit := func(limit *int64) { + opts := []ClientOption{} + expLimit := int64(wsDefaultReadLimit) + if limit != nil && *limit >= 0 { + opts = append(opts, WithWebsocketMessageSizeLimit(*limit)) + if *limit > 0 { + expLimit = *limit // 0 means infinite + } + } + client, err := DialOptions(context.Background(), wsURL, opts...) + if err != nil { + t.Fatalf("can't dial: %v", err) + } + defer client.Close() + // Remove some bytes for json encoding overhead. + underLimit := int(expLimit - 128) + overLimit := expLimit + 1 + if expLimit == wsDefaultReadLimit { + // No point trying the full 32MB in tests. Just sanity-check that + // it's not obviously limited. + underLimit = 1024 + overLimit = -1 + } + var res string + // Check under limit + if err = client.Call(&res, "test_repeat", "A", underLimit); err != nil { + t.Fatalf("unexpected error with limit %d: %v", expLimit, err) + } + if len(res) != underLimit || strings.Count(res, "A") != underLimit { + t.Fatal("incorrect data") + } + // Check over limit + if overLimit > 0 { + err = client.Call(&res, "test_repeat", "A", expLimit+1) + if err == nil || err != websocket.ErrReadLimit { + t.Fatalf("wrong error with limit %d: %v expecting %v", expLimit, err, websocket.ErrReadLimit) + } + } + } + ptr := func(v int64) *int64 { return &v } + + testLimit(ptr(-1)) // Should be ignored (use default) + testLimit(ptr(0)) // Should be ignored (use default) + testLimit(nil) // Should be ignored (use default) + testLimit(ptr(200)) + testLimit(ptr(wsDefaultReadLimit * 2)) +} + func TestWebsocketPeerInfo(t *testing.T) { var ( s = newTestServer() @@ -159,7 +215,7 @@ func TestClientWebsocketPing(t *testing.T) { var ( sendPing = make(chan struct{}) server = wsPingTestServer(t, sendPing) - ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) ) defer cancel() defer server.Shutdown(ctx) @@ -210,7 +266,7 @@ func TestClientWebsocketLargeMessage(t *testing.T) { defer srv.Stop() defer httpsrv.Close() - respLength := wsMessageSizeLimit - 50 + respLength := wsDefaultReadLimit - 50 srv.RegisterName("test", largeRespService{respLength}) c, err := DialWebsocket(context.Background(), wsURL, "") @@ -227,63 +283,6 @@ func TestClientWebsocketLargeMessage(t *testing.T) { } } -func TestClientWebsocketSevered(t *testing.T) { - t.Parallel() - - var ( - server = wsPingTestServer(t, nil) - ctx = context.Background() - ) - defer server.Shutdown(ctx) - - u, err := url.Parse("http://" + server.Addr) - if err != nil { - t.Fatal(err) - } - rproxy := httputil.NewSingleHostReverseProxy(u) - var severable *severableReadWriteCloser - rproxy.ModifyResponse = func(response *http.Response) error { - severable = &severableReadWriteCloser{ReadWriteCloser: response.Body.(io.ReadWriteCloser)} - response.Body = severable - return nil - } - frontendProxy := httptest.NewServer(rproxy) - defer frontendProxy.Close() - - wsURL := "ws:" + strings.TrimPrefix(frontendProxy.URL, "http:") - client, err := DialWebsocket(ctx, wsURL, "") - if err != nil { - t.Fatalf("client dial error: %v", err) - } - defer client.Close() - - resultChan := make(chan int) - sub, err := client.EthSubscribe(ctx, resultChan, "foo") - if err != nil { - t.Fatalf("client subscribe error: %v", err) - } - - // sever the connection - severable.Sever() - - // Wait for subscription error. - timeout := time.NewTimer(3 * wsPingInterval) - defer timeout.Stop() - for { - select { - case err := <-sub.Err(): - t.Log("client subscription error:", err) - return - case result := <-resultChan: - t.Error("unexpected result:", result) - return - case <-timeout.C: - t.Error("didn't get any error within the test timeout") - return - } - } -} - // wsPingTestServer runs a WebSocket server which accepts a single subscription request. // When a value arrives on sendPing, the server sends a ping frame, waits for a matching // pong and finally delivers a single subscription result. @@ -386,31 +385,3 @@ func wsPingTestHandler(t *testing.T, conn *websocket.Conn, shutdown, sendPing <- } } } - -// severableReadWriteCloser wraps an io.ReadWriteCloser and provides a Sever() method to drop writes and read empty. -type severableReadWriteCloser struct { - io.ReadWriteCloser - severed int32 // atomic -} - -func (s *severableReadWriteCloser) Sever() { - atomic.StoreInt32(&s.severed, 1) -} - -func (s *severableReadWriteCloser) Read(p []byte) (n int, err error) { - if atomic.LoadInt32(&s.severed) > 0 { - return 0, nil - } - return s.ReadWriteCloser.Read(p) -} - -func (s *severableReadWriteCloser) Write(p []byte) (n int, err error) { - if atomic.LoadInt32(&s.severed) > 0 { - return len(p), nil - } - return s.ReadWriteCloser.Write(p) -} - -func (s *severableReadWriteCloser) Close() error { - return s.ReadWriteCloser.Close() -} diff --git a/signer/core/stdioui.go b/signer/core/stdioui.go index 6963a89122..a0ce684417 100644 --- a/signer/core/stdioui.go +++ b/signer/core/stdioui.go @@ -25,7 +25,7 @@ import ( ) type StdIOUI struct { - client rpc.Client + client *rpc.Client } func NewStdIOUI() *StdIOUI { @@ -33,7 +33,7 @@ func NewStdIOUI() *StdIOUI { if err != nil { log.Crit("Could not create stdio client", "err", err) } - ui := &StdIOUI{client: *client} + ui := &StdIOUI{client: client} return ui }