From 6f1c1411a0013129b52dcff3b68150d05bf8ef10 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 23 Jan 2024 10:22:29 -0800 Subject: [PATCH] Fix large block requests in state sync (#1057) * Fix large block requests in state sync * fixes * fix --- plugin/evm/message/codec.go | 2 +- plugin/evm/message/message_test.go | 2 +- sync/handlers/block_request.go | 17 ++- sync/handlers/block_request_test.go | 201 +++++++++++++++++++--------- 4 files changed, 151 insertions(+), 71 deletions(-) diff --git a/plugin/evm/message/codec.go b/plugin/evm/message/codec.go index 6939ca4978..48b1436f7d 100644 --- a/plugin/evm/message/codec.go +++ b/plugin/evm/message/codec.go @@ -14,7 +14,7 @@ import ( const ( Version = uint16(0) - maxMessageSize = 1 * units.MiB + maxMessageSize = 2*units.MiB - 64*units.KiB // Subtract 64 KiB from p2p network cap to leave room for encoding overhead from AvalancheGo ) var ( diff --git a/plugin/evm/message/message_test.go b/plugin/evm/message/message_test.go index 0a18fde784..89357a8d04 100644 --- a/plugin/evm/message/message_test.go +++ b/plugin/evm/message/message_test.go @@ -40,7 +40,7 @@ func TestTxsTooLarge(t *testing.T) { assert := assert.New(t) builtMsg := EthTxsGossip{ - Txs: utils.RandomBytes(1024 * units.KiB), + Txs: utils.RandomBytes(maxMessageSize), } _, err := BuildGossipMessage(Codec, builtMsg) assert.Error(err) diff --git a/sync/handlers/block_request.go b/sync/handlers/block_request.go index 76203b78bc..a8fc070eb0 100644 --- a/sync/handlers/block_request.go +++ b/sync/handlers/block_request.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/subnet-evm/plugin/evm/message" "github.com/ava-labs/subnet-evm/sync/handlers/stats" @@ -17,9 +18,12 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// parentLimit specifies how many parents to retrieve and send given a starting hash -// This value overrides any specified limit in blockRequest.Parents if it is greater than this value -const parentLimit = uint16(64) +const ( + // parentLimit specifies how many parents to retrieve and send given a starting hash + // This value overrides any specified limit in blockRequest.Parents if it is greater than this value + parentLimit = uint16(64) + targetMessageByteSize = units.MiB - units.KiB // Target total block bytes slightly under original network codec max size of 1MB +) // BlockRequestHandler is a peer.RequestHandler for message.BlockRequest // serving requested blocks starting at specified hash @@ -52,6 +56,7 @@ func (b *BlockRequestHandler) OnBlockRequest(ctx context.Context, nodeID ids.Nod parents = parentLimit } blocks := make([][]byte, 0, parents) + totalBytes := 0 // ensure metrics are captured properly on all return paths defer func() { @@ -84,7 +89,13 @@ func (b *BlockRequestHandler) OnBlockRequest(ctx context.Context, nodeID ids.Nod return nil, nil } + if buf.Len()+totalBytes > targetMessageByteSize && len(blocks) > 0 { + log.Debug("Skipping block due to max total bytes size", "totalBlockDataSize", totalBytes, "blockSize", buf.Len(), "maxTotalBytesSize", targetMessageByteSize) + break + } + blocks = append(blocks, buf.Bytes()) + totalBytes += buf.Len() hash = block.ParentHash() height-- } diff --git a/sync/handlers/block_request_test.go b/sync/handlers/block_request_test.go index c3886ce78a..9acb4d9c80 100644 --- a/sync/handlers/block_request_test.go +++ b/sync/handlers/block_request_test.go @@ -5,42 +5,47 @@ package handlers import ( "context" + "math/big" "testing" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/subnet-evm/consensus/dummy" "github.com/ava-labs/subnet-evm/core" + "github.com/ava-labs/subnet-evm/core/rawdb" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethdb/memorydb" "github.com/ava-labs/subnet-evm/params" "github.com/ava-labs/subnet-evm/plugin/evm/message" "github.com/ava-labs/subnet-evm/sync/handlers/stats" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/stretchr/testify/assert" ) -func TestBlockRequestHandler(t *testing.T) { - var gspec = &core.Genesis{ - Config: params.TestChainConfig, - } - memdb := memorydb.New() - genesis := gspec.MustCommit(memdb) - engine := dummy.NewETHFaker() - blocks, _, err := core.GenerateChain(params.TestChainConfig, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) {}) - if err != nil { - t.Fatal("unexpected error when generating test blockchain", err) - } +type blockRequestTest struct { + name string - assert.Len(t, blocks, 96) + // starting block, specify either Index or (hash+height) + startBlockIndex int + startBlockHash common.Hash + startBlockHeight uint64 + + requestedParents uint16 + expectedBlocks int + expectNilResponse bool + assertResponse func(t testing.TB, stats *stats.MockHandlerStats, b []byte) +} + +func executeBlockRequestTest(t testing.TB, test blockRequestTest, blocks []*types.Block) { + mockHandlerStats := &stats.MockHandlerStats{} // convert into map blocksDB := make(map[common.Hash]*types.Block, len(blocks)) for _, blk := range blocks { blocksDB[blk.Hash()] = blk } - - mockHandlerStats := &stats.MockHandlerStats{} blockProvider := &TestBlockProvider{ GetBlockFn: func(hash common.Hash, height uint64) *types.Block { blk, ok := blocksDB[hash] @@ -52,19 +57,64 @@ func TestBlockRequestHandler(t *testing.T) { } blockRequestHandler := NewBlockRequestHandler(blockProvider, message.Codec, mockHandlerStats) - tests := []struct { - name string + var blockRequest message.BlockRequest + if test.startBlockHash != (common.Hash{}) { + blockRequest.Hash = test.startBlockHash + blockRequest.Height = test.startBlockHeight + } else { + startingBlock := blocks[test.startBlockIndex] + blockRequest.Hash = startingBlock.Hash() + blockRequest.Height = startingBlock.NumberU64() + } + blockRequest.Parents = test.requestedParents - // starting block, specify either Index or (hash+height) - startBlockIndex int - startBlockHash common.Hash - startBlockHeight uint64 + responseBytes, err := blockRequestHandler.OnBlockRequest(context.Background(), ids.GenerateTestNodeID(), 1, blockRequest) + if err != nil { + t.Fatal("unexpected error during block request", err) + } + if test.assertResponse != nil { + test.assertResponse(t, mockHandlerStats, responseBytes) + } - requestedParents uint16 - expectedBlocks int - expectNilResponse bool - assertResponse func(t *testing.T, response []byte) - }{ + if test.expectNilResponse { + assert.Nil(t, responseBytes) + return + } + + assert.NotEmpty(t, responseBytes) + + var response message.BlockResponse + if _, err = message.Codec.Unmarshal(responseBytes, &response); err != nil { + t.Fatal("error unmarshalling", err) + } + assert.Len(t, response.Blocks, test.expectedBlocks) + + for _, blockBytes := range response.Blocks { + block := new(types.Block) + if err := rlp.DecodeBytes(blockBytes, block); err != nil { + t.Fatal("could not parse block", err) + } + assert.GreaterOrEqual(t, test.startBlockIndex, 0) + assert.Equal(t, blocks[test.startBlockIndex].Hash(), block.Hash()) + test.startBlockIndex-- + } + mockHandlerStats.Reset() +} + +func TestBlockRequestHandler(t *testing.T) { + var gspec = &core.Genesis{ + Config: params.TestChainConfig, + } + memdb := memorydb.New() + genesis := gspec.MustCommit(memdb) + engine := dummy.NewETHFaker() + blocks, _, err := core.GenerateChain(params.TestChainConfig, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) {}) + if err != nil { + t.Fatal("unexpected error when generating test blockchain", err) + } + assert.Len(t, blocks, 96) + + tests := []blockRequestTest{ { name: "handler_returns_blocks_as_requested", startBlockIndex: 64, @@ -89,55 +139,74 @@ func TestBlockRequestHandler(t *testing.T) { startBlockHeight: 1_000_000, requestedParents: 64, expectNilResponse: true, - assertResponse: func(t *testing.T, _ []byte) { + assertResponse: func(t testing.TB, mockHandlerStats *stats.MockHandlerStats, _ []byte) { assert.Equal(t, uint32(1), mockHandlerStats.MissingBlockHashCount) }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var blockRequest message.BlockRequest - if test.startBlockHash != (common.Hash{}) { - blockRequest.Hash = test.startBlockHash - blockRequest.Height = test.startBlockHeight - } else { - startingBlock := blocks[test.startBlockIndex] - blockRequest.Hash = startingBlock.Hash() - blockRequest.Height = startingBlock.NumberU64() - } - blockRequest.Parents = test.requestedParents - - responseBytes, err := blockRequestHandler.OnBlockRequest(context.Background(), ids.GenerateTestNodeID(), 1, blockRequest) - if err != nil { - t.Fatal("unexpected error during block request", err) - } - if test.assertResponse != nil { - test.assertResponse(t, responseBytes) - } - - if test.expectNilResponse { - assert.Nil(t, responseBytes) - return - } + executeBlockRequestTest(t, test, blocks) + }) + } +} - assert.NotEmpty(t, responseBytes) +func TestBlockRequestHandlerLargeBlocks(t *testing.T) { + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + funds = big.NewInt(1000000000000000000) + gspec = &core.Genesis{ + Config: ¶ms.ChainConfig{HomesteadBlock: new(big.Int)}, + Alloc: core.GenesisAlloc{addr1: {Balance: funds}}, + } + signer = types.LatestSigner(gspec.Config) + ) + memdb := rawdb.NewMemoryDatabase() + genesis := gspec.MustCommit(memdb) + engine := dummy.NewETHFaker() + blocks, _, err := core.GenerateChain(gspec.Config, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) { + var data []byte + switch { + case i <= 32: + data = make([]byte, units.MiB) + default: + data = make([]byte, units.MiB/16) + } + tx, err := types.SignTx(types.NewTransaction(b.TxNonce(addr1), addr1, big.NewInt(10000), 4_215_304, nil, data), signer, key1) + if err != nil { + t.Fatal(err) + } + b.AddTx(tx) + }) + if err != nil { + t.Fatal("unexpected error when generating test blockchain", err) + } + assert.Len(t, blocks, 96) - var response message.BlockResponse - if _, err = message.Codec.Unmarshal(responseBytes, &response); err != nil { - t.Fatal("error unmarshalling", err) - } - assert.Len(t, response.Blocks, test.expectedBlocks) - - for _, blockBytes := range response.Blocks { - block := new(types.Block) - if err := rlp.DecodeBytes(blockBytes, block); err != nil { - t.Fatal("could not parse block", err) - } - assert.GreaterOrEqual(t, test.startBlockIndex, 0) - assert.Equal(t, blocks[test.startBlockIndex].Hash(), block.Hash()) - test.startBlockIndex-- - } - mockHandlerStats.Reset() + tests := []blockRequestTest{ + { + name: "handler_returns_blocks_as_requested", + startBlockIndex: 64, + requestedParents: 10, + expectedBlocks: 10, + }, + { + name: "handler_caps_blocks_size_limit", + startBlockIndex: 64, + requestedParents: 16, + expectedBlocks: 15, + }, + { + name: "handler_caps_blocks_size_limit_on_first_block", + startBlockIndex: 32, + requestedParents: 10, + expectedBlocks: 1, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + executeBlockRequestTest(t, test, blocks) }) } }