Skip to content

Commit

Permalink
Fix large block requests in state sync (ava-labs#1057)
Browse files Browse the repository at this point in the history
* Fix large block requests in state sync

* fixes

* fix
  • Loading branch information
darioush authored Jan 23, 2024
1 parent 60356fe commit 6f1c141
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 71 deletions.
2 changes: 1 addition & 1 deletion plugin/evm/message/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

const (
Version = uint16(0)
maxMessageSize = 1 * units.MiB
maxMessageSize = 2*units.MiB - 64*units.KiB // Subtract 64 KiB from p2p network cap to leave room for encoding overhead from AvalancheGo
)

var (
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTxsTooLarge(t *testing.T) {
assert := assert.New(t)

builtMsg := EthTxsGossip{
Txs: utils.RandomBytes(1024 * units.KiB),
Txs: utils.RandomBytes(maxMessageSize),
}
_, err := BuildGossipMessage(Codec, builtMsg)
assert.Error(err)
Expand Down
17 changes: 14 additions & 3 deletions sync/handlers/block_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/units"

"github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/sync/handlers/stats"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

// parentLimit specifies how many parents to retrieve and send given a starting hash
// This value overrides any specified limit in blockRequest.Parents if it is greater than this value
const parentLimit = uint16(64)
const (
// parentLimit specifies how many parents to retrieve and send given a starting hash
// This value overrides any specified limit in blockRequest.Parents if it is greater than this value
parentLimit = uint16(64)
targetMessageByteSize = units.MiB - units.KiB // Target total block bytes slightly under original network codec max size of 1MB
)

// BlockRequestHandler is a peer.RequestHandler for message.BlockRequest
// serving requested blocks starting at specified hash
Expand Down Expand Up @@ -52,6 +56,7 @@ func (b *BlockRequestHandler) OnBlockRequest(ctx context.Context, nodeID ids.Nod
parents = parentLimit
}
blocks := make([][]byte, 0, parents)
totalBytes := 0

// ensure metrics are captured properly on all return paths
defer func() {
Expand Down Expand Up @@ -84,7 +89,13 @@ func (b *BlockRequestHandler) OnBlockRequest(ctx context.Context, nodeID ids.Nod
return nil, nil
}

if buf.Len()+totalBytes > targetMessageByteSize && len(blocks) > 0 {
log.Debug("Skipping block due to max total bytes size", "totalBlockDataSize", totalBytes, "blockSize", buf.Len(), "maxTotalBytesSize", targetMessageByteSize)
break
}

blocks = append(blocks, buf.Bytes())
totalBytes += buf.Len()
hash = block.ParentHash()
height--
}
Expand Down
201 changes: 135 additions & 66 deletions sync/handlers/block_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,47 @@ package handlers

import (
"context"
"math/big"
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/subnet-evm/consensus/dummy"
"github.com/ava-labs/subnet-evm/core"
"github.com/ava-labs/subnet-evm/core/rawdb"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethdb/memorydb"
"github.com/ava-labs/subnet-evm/params"
"github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/sync/handlers/stats"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/assert"
)

func TestBlockRequestHandler(t *testing.T) {
var gspec = &core.Genesis{
Config: params.TestChainConfig,
}
memdb := memorydb.New()
genesis := gspec.MustCommit(memdb)
engine := dummy.NewETHFaker()
blocks, _, err := core.GenerateChain(params.TestChainConfig, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) {})
if err != nil {
t.Fatal("unexpected error when generating test blockchain", err)
}
type blockRequestTest struct {
name string

assert.Len(t, blocks, 96)
// starting block, specify either Index or (hash+height)
startBlockIndex int
startBlockHash common.Hash
startBlockHeight uint64

requestedParents uint16
expectedBlocks int
expectNilResponse bool
assertResponse func(t testing.TB, stats *stats.MockHandlerStats, b []byte)
}

func executeBlockRequestTest(t testing.TB, test blockRequestTest, blocks []*types.Block) {
mockHandlerStats := &stats.MockHandlerStats{}

// convert into map
blocksDB := make(map[common.Hash]*types.Block, len(blocks))
for _, blk := range blocks {
blocksDB[blk.Hash()] = blk
}

mockHandlerStats := &stats.MockHandlerStats{}
blockProvider := &TestBlockProvider{
GetBlockFn: func(hash common.Hash, height uint64) *types.Block {
blk, ok := blocksDB[hash]
Expand All @@ -52,19 +57,64 @@ func TestBlockRequestHandler(t *testing.T) {
}
blockRequestHandler := NewBlockRequestHandler(blockProvider, message.Codec, mockHandlerStats)

tests := []struct {
name string
var blockRequest message.BlockRequest
if test.startBlockHash != (common.Hash{}) {
blockRequest.Hash = test.startBlockHash
blockRequest.Height = test.startBlockHeight
} else {
startingBlock := blocks[test.startBlockIndex]
blockRequest.Hash = startingBlock.Hash()
blockRequest.Height = startingBlock.NumberU64()
}
blockRequest.Parents = test.requestedParents

// starting block, specify either Index or (hash+height)
startBlockIndex int
startBlockHash common.Hash
startBlockHeight uint64
responseBytes, err := blockRequestHandler.OnBlockRequest(context.Background(), ids.GenerateTestNodeID(), 1, blockRequest)
if err != nil {
t.Fatal("unexpected error during block request", err)
}
if test.assertResponse != nil {
test.assertResponse(t, mockHandlerStats, responseBytes)
}

requestedParents uint16
expectedBlocks int
expectNilResponse bool
assertResponse func(t *testing.T, response []byte)
}{
if test.expectNilResponse {
assert.Nil(t, responseBytes)
return
}

assert.NotEmpty(t, responseBytes)

var response message.BlockResponse
if _, err = message.Codec.Unmarshal(responseBytes, &response); err != nil {
t.Fatal("error unmarshalling", err)
}
assert.Len(t, response.Blocks, test.expectedBlocks)

for _, blockBytes := range response.Blocks {
block := new(types.Block)
if err := rlp.DecodeBytes(blockBytes, block); err != nil {
t.Fatal("could not parse block", err)
}
assert.GreaterOrEqual(t, test.startBlockIndex, 0)
assert.Equal(t, blocks[test.startBlockIndex].Hash(), block.Hash())
test.startBlockIndex--
}
mockHandlerStats.Reset()
}

func TestBlockRequestHandler(t *testing.T) {
var gspec = &core.Genesis{
Config: params.TestChainConfig,
}
memdb := memorydb.New()
genesis := gspec.MustCommit(memdb)
engine := dummy.NewETHFaker()
blocks, _, err := core.GenerateChain(params.TestChainConfig, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) {})
if err != nil {
t.Fatal("unexpected error when generating test blockchain", err)
}
assert.Len(t, blocks, 96)

tests := []blockRequestTest{
{
name: "handler_returns_blocks_as_requested",
startBlockIndex: 64,
Expand All @@ -89,55 +139,74 @@ func TestBlockRequestHandler(t *testing.T) {
startBlockHeight: 1_000_000,
requestedParents: 64,
expectNilResponse: true,
assertResponse: func(t *testing.T, _ []byte) {
assertResponse: func(t testing.TB, mockHandlerStats *stats.MockHandlerStats, _ []byte) {
assert.Equal(t, uint32(1), mockHandlerStats.MissingBlockHashCount)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var blockRequest message.BlockRequest
if test.startBlockHash != (common.Hash{}) {
blockRequest.Hash = test.startBlockHash
blockRequest.Height = test.startBlockHeight
} else {
startingBlock := blocks[test.startBlockIndex]
blockRequest.Hash = startingBlock.Hash()
blockRequest.Height = startingBlock.NumberU64()
}
blockRequest.Parents = test.requestedParents

responseBytes, err := blockRequestHandler.OnBlockRequest(context.Background(), ids.GenerateTestNodeID(), 1, blockRequest)
if err != nil {
t.Fatal("unexpected error during block request", err)
}
if test.assertResponse != nil {
test.assertResponse(t, responseBytes)
}

if test.expectNilResponse {
assert.Nil(t, responseBytes)
return
}
executeBlockRequestTest(t, test, blocks)
})
}
}

assert.NotEmpty(t, responseBytes)
func TestBlockRequestHandlerLargeBlocks(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
funds = big.NewInt(1000000000000000000)
gspec = &core.Genesis{
Config: &params.ChainConfig{HomesteadBlock: new(big.Int)},
Alloc: core.GenesisAlloc{addr1: {Balance: funds}},
}
signer = types.LatestSigner(gspec.Config)
)
memdb := rawdb.NewMemoryDatabase()
genesis := gspec.MustCommit(memdb)
engine := dummy.NewETHFaker()
blocks, _, err := core.GenerateChain(gspec.Config, genesis, engine, memdb, 96, 0, func(i int, b *core.BlockGen) {
var data []byte
switch {
case i <= 32:
data = make([]byte, units.MiB)
default:
data = make([]byte, units.MiB/16)
}
tx, err := types.SignTx(types.NewTransaction(b.TxNonce(addr1), addr1, big.NewInt(10000), 4_215_304, nil, data), signer, key1)
if err != nil {
t.Fatal(err)
}
b.AddTx(tx)
})
if err != nil {
t.Fatal("unexpected error when generating test blockchain", err)
}
assert.Len(t, blocks, 96)

var response message.BlockResponse
if _, err = message.Codec.Unmarshal(responseBytes, &response); err != nil {
t.Fatal("error unmarshalling", err)
}
assert.Len(t, response.Blocks, test.expectedBlocks)

for _, blockBytes := range response.Blocks {
block := new(types.Block)
if err := rlp.DecodeBytes(blockBytes, block); err != nil {
t.Fatal("could not parse block", err)
}
assert.GreaterOrEqual(t, test.startBlockIndex, 0)
assert.Equal(t, blocks[test.startBlockIndex].Hash(), block.Hash())
test.startBlockIndex--
}
mockHandlerStats.Reset()
tests := []blockRequestTest{
{
name: "handler_returns_blocks_as_requested",
startBlockIndex: 64,
requestedParents: 10,
expectedBlocks: 10,
},
{
name: "handler_caps_blocks_size_limit",
startBlockIndex: 64,
requestedParents: 16,
expectedBlocks: 15,
},
{
name: "handler_caps_blocks_size_limit_on_first_block",
startBlockIndex: 32,
requestedParents: 10,
expectedBlocks: 1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
executeBlockRequestTest(t, test, blocks)
})
}
}
Expand Down

0 comments on commit 6f1c141

Please sign in to comment.