From 9c85de5df2b273828f4bfdefcf50cb5dd2e5bfbc Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 13 Dec 2023 11:34:22 -0500 Subject: [PATCH] Refactor p2p unit tests (#425) --- plugin/evm/tx_gossip_test.go | 240 ++++++++++++++++++++--------------- plugin/evm/vm.go | 142 +++++++++++---------- plugin/evm/vm_test.go | 18 +++ 3 files changed, 234 insertions(+), 166 deletions(-) diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go index cb7c094222..cc41ad9ad5 100644 --- a/plugin/evm/tx_gossip_test.go +++ b/plugin/evm/tx_gossip_test.go @@ -10,60 +10,90 @@ import ( "testing" "time" + "github.com/ava-labs/avalanchego/chains/atomic" + "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/components/avax" + "github.com/ava-labs/avalanchego/vms/secp256k1fx" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/params" ) func TestEthTxGossip(t *testing.T) { require := require.New(t) + ctx := context.Background() + snowCtx := snow.DefaultContextTest() + validatorState := &validators.TestState{} + snowCtx.ValidatorState = validatorState - // set up prefunded address - importAmount := uint64(1_000_000_000) - issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONLatest, "", "", map[ids.ShortID]uint64{ - testShortIDAddrs[0]: importAmount, - }) - defer func() { - require.NoError(vm.Shutdown(context.Background())) - }() - - txPoolNewHeads := make(chan core.NewTxPoolHeadEvent) - vm.txPool.SubscribeNewHeadEvent(txPoolNewHeads) - - importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + pk, err := secp256k1.NewPrivateKey() require.NoError(err) - require.NoError(vm.mempool.AddLocalTx(importTx)) - <-issuer - - blk, err := vm.BuildBlock(context.Background()) + address := GetEthAddress(pk) + genesis := newPrefundedGenesis(100_000_000_000_000_000, address) + genesisBytes, err := genesis.MarshalJSON() require.NoError(err) - require.NoError(blk.Verify(context.Background())) - require.NoError(vm.SetPreference(context.Background(), blk.ID())) - require.NoError(blk.Accept(context.Background())) - <-txPoolNewHeads + responseSender := &common.FakeSender{ + SentAppResponse: make(chan []byte, 1), + } + vm := &VM{ + p2pSender: responseSender, + atomicTxGossipHandler: &p2p.NoOpHandler{}, + atomicTxGossiper: &gossip.NoOpGossiper{}, + } + + require.NoError(vm.Initialize( + ctx, + snowCtx, + memdb.New(), + genesisBytes, + nil, + nil, + make(chan common.Message), + nil, + &common.SenderTest{}, + )) + require.NoError(vm.SetState(ctx, snow.NormalOp)) + + defer func() { + require.NoError(vm.Shutdown(ctx)) + }() // sender for the peer requesting gossip from [vm] - peerSender := &common.SenderTest{} + peerSender := &common.FakeSender{ + SentAppRequest: make(chan []byte, 1), + } + network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") require.NoError(err) client := network.NewClient(ethTxGossipProtocol) + // we only accept gossip requests from validators + requestingNodeID := ids.GenerateTestNodeID() + require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil)) + validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) require.NoError(err) emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() @@ -77,34 +107,6 @@ func TestEthTxGossip(t *testing.T) { require.NoError(err) wg := &sync.WaitGroup{} - - requestingNodeID := ids.GenerateTestNodeID() - peerSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error { - go func() { - require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) - }() - return nil - } - - sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { - go func() { - require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes)) - }() - return nil - } - - // we only accept gossip requests from validators - require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil)) - mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) - require.True(ok) - mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { - return 0, nil - } - mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { - return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil - } - - // Ask the VM for any new transactions. We should get nothing at first. wg.Add(1) onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) @@ -114,14 +116,14 @@ func TestEthTxGossip(t *testing.T) { require.Empty(response.Gossip) wg.Done() } - require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) + require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest)) + require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse)) wg.Wait() // Issue a tx to the VM - address := testEthAddrs[0] - key := testKeys[0].ToECDSA() tx := types.NewTransaction(0, address, big.NewInt(10), 100_000, big.NewInt(params.LaunchMinGasPrice), nil) - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), key) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), pk.ToECDSA()) require.NoError(err) errs := vm.txPool.AddLocals([]*types.Transaction{signedTx}) @@ -146,68 +148,92 @@ func TestEthTxGossip(t *testing.T) { wg.Done() } - require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) + require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest)) + require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse)) wg.Wait() } func TestAtomicTxGossip(t *testing.T) { require := require.New(t) + ctx := context.Background() + snowCtx := snow.DefaultContextTest() + snowCtx.AVAXAssetID = ids.GenerateTestID() + snowCtx.XChainID = ids.GenerateTestID() + validatorState := &validators.TestState{ + GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { + return ids.Empty, nil + }, + } + snowCtx.ValidatorState = validatorState + memory := atomic.NewMemory(memdb.New()) + snowCtx.SharedMemory = memory.NewSharedMemory(ids.Empty) - // set up prefunded address - importAmount := uint64(1_000_000_000) - issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase0, "", "", map[ids.ShortID]uint64{ - testShortIDAddrs[0]: importAmount, - }) + pk, err := secp256k1.NewPrivateKey() + require.NoError(err) + address := GetEthAddress(pk) + genesis := newPrefundedGenesis(100_000_000_000_000_000, address) + genesisBytes, err := genesis.MarshalJSON() + require.NoError(err) + + responseSender := &common.FakeSender{ + SentAppResponse: make(chan []byte, 1), + } + vm := &VM{ + p2pSender: responseSender, + ethTxGossipHandler: &p2p.NoOpHandler{}, + ethTxGossiper: &gossip.NoOpGossiper{}, + } + + require.NoError(vm.Initialize( + ctx, + snowCtx, + memdb.New(), + genesisBytes, + nil, + nil, + make(chan common.Message), + nil, + &common.SenderTest{}, + )) + require.NoError(vm.SetState(ctx, snow.NormalOp)) defer func() { - require.NoError(vm.Shutdown(context.Background())) + require.NoError(vm.Shutdown(ctx)) }() // sender for the peer requesting gossip from [vm] - peerSender := &common.SenderTest{} + peerSender := &common.FakeSender{ + SentAppRequest: make(chan []byte, 1), + } network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") require.NoError(err) client := network.NewClient(atomicTxGossipProtocol) + // we only accept gossip requests from validators + requestingNodeID := ids.GenerateTestNodeID() + require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil)) + validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) require.NoError(err) - bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary() + emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() require.NoError(err) request := &sdk.PullGossipRequest{ - Filter: bloomBytes, - Salt: emptyBloomFilter.Salt[:], + Filter: emptyBloomFilterBytes, + Salt: utils.RandomBytes(32), } + requestBytes, err := proto.Marshal(request) require.NoError(err) - requestingNodeID := ids.GenerateTestNodeID() wg := &sync.WaitGroup{} - peerSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error { - go func() { - require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) - }() - return nil - } - - sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { - go func() { - require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes)) - }() - return nil - } - - // we only accept gossip requests from validators - require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil)) - mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) - require.True(ok) - mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { - return 0, nil - } - mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { - return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil - } - - // Ask the VM for any new transactions. We should get nothing at first. wg.Add(1) onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) @@ -217,15 +243,25 @@ func TestAtomicTxGossip(t *testing.T) { require.Empty(response.Gossip) wg.Done() } - require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) + require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest)) + require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse)) wg.Wait() - // issue a new tx to the vm - importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + // Issue a tx to the VM + utxo, err := addUTXO( + memory, + snowCtx, + ids.GenerateTestID(), + 0, + snowCtx.AVAXAssetID, + 100_000_000_000, + pk.PublicKey().Address(), + ) require.NoError(err) - - require.NoError(vm.mempool.AddLocalTx(importTx)) - <-issuer + tx, err := vm.newImportTxWithUTXOs(vm.ctx.XChainID, address, initialBaseFee, secp256k1fx.NewKeychain(pk), []*avax.UTXO{utxo}) + require.NoError(err) + require.NoError(vm.mempool.AddLocalTx(tx)) // wait so we aren't throttled by the vm time.Sleep(5 * time.Second) @@ -241,10 +277,12 @@ func TestAtomicTxGossip(t *testing.T) { gotTx := &GossipAtomicTx{} require.NoError(gotTx.Unmarshal(response.Gossip[0])) - require.Equal(importTx.InputUTXOs(), gotTx.Tx.InputUTXOs()) + require.Equal(tx.ID(), gotTx.GetID()) wg.Done() } - require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) + require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest)) + require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse)) wg.Wait() } diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 2bd2bf766b..4ae8bea834 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -343,6 +343,13 @@ type VM struct { // Avalanche Warp Messaging backend // Used to serve BLS signatures of warp messages over RPC warpBackend warp.Backend + + // Initialize only sets these if nil so they can be overridden in tests + p2pSender commonEng.AppSender + ethTxGossipHandler p2p.Handler + atomicTxGossipHandler p2p.Handler + ethTxGossiper gossip.Gossiper + atomicTxGossiper gossip.Gossiper } // Codec implements the secp256k1fx interface @@ -584,8 +591,11 @@ func (vm *VM) Initialize( } // initialize peer network - var p2pNetwork *p2p.Network - p2pNetwork, err = p2p.NewNetwork(vm.ctx.Log, appSender, vm.sdkMetrics, "p2p") + if vm.p2pSender == nil { + vm.p2pSender = appSender + } + + p2pNetwork, err := p2p.NewNetwork(vm.ctx.Log, vm.p2pSender, vm.sdkMetrics, "p2p") if err != nil { return fmt.Errorf("failed to initialize p2p network: %w", err) } @@ -1072,93 +1082,95 @@ func (vm *VM) initBlockBuilding() error { vm.shutdownWg.Done() }() - var ( - ethTxGossipHandler p2p.Handler - atomicTxGossipHandler p2p.Handler - ) - - ethTxGossipHandler, err = gossip.NewHandler[*GossipEthTx](ethTxPool, ethTxGossipHandlerConfig, vm.sdkMetrics) - if err != nil { - return err - } - - ethTxGossipHandler = p2p.NewThrottlerHandler( - ethTxGossipHandler, - p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), - vm.ctx.Log, - ) + if vm.ethTxGossipHandler == nil { + var ethTxGossipHandler p2p.Handler + ethTxGossipHandler, err = gossip.NewHandler[*GossipEthTx](ethTxPool, ethTxGossipHandlerConfig, vm.sdkMetrics) + if err != nil { + return err + } - ethTxGossipHandler = p2p.NewValidatorHandler(ethTxGossipHandler, vm.validators, vm.ctx.Log) + ethTxGossipHandler = p2p.NewThrottlerHandler( + ethTxGossipHandler, + p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), + vm.ctx.Log, + ) - if err := vm.Network.AddHandler(ethTxGossipProtocol, ethTxGossipHandler); err != nil { - return err + vm.ethTxGossipHandler = p2p.NewValidatorHandler(ethTxGossipHandler, vm.validators, vm.ctx.Log) } - atomicTxGossipHandler, err = gossip.NewHandler[*GossipAtomicTx](vm.mempool, atomicTxGossipHandlerConfig, vm.sdkMetrics) - if err != nil { + if err := vm.Network.AddHandler(ethTxGossipProtocol, vm.ethTxGossipHandler); err != nil { return err } - atomicTxGossipHandler = p2p.NewThrottlerHandler( - atomicTxGossipHandler, - p2p.NewSlidingWindowThrottler(throttlingLimit, throttlingLimit), - vm.ctx.Log, - ) + if vm.atomicTxGossipHandler == nil { + var atomicTxGossipHandler p2p.Handler + atomicTxGossipHandler, err = gossip.NewHandler[*GossipAtomicTx](vm.mempool, atomicTxGossipHandlerConfig, vm.sdkMetrics) + if err != nil { + return err + } - atomicTxGossipHandler = p2p.NewValidatorHandler(atomicTxGossipHandler, vm.validators, vm.ctx.Log) + atomicTxGossipHandler = p2p.NewThrottlerHandler( + atomicTxGossipHandler, + p2p.NewSlidingWindowThrottler(throttlingLimit, throttlingLimit), + vm.ctx.Log, + ) - if err := vm.Network.AddHandler(atomicTxGossipProtocol, atomicTxGossipHandler); err != nil { - return err + vm.atomicTxGossipHandler = p2p.NewValidatorHandler(atomicTxGossipHandler, vm.validators, vm.ctx.Log) } - var ( - ethTxGossiper gossip.Gossiper - atomicTxGossiper gossip.Gossiper - ) - - ethTxGossipClient := vm.Network.NewClient(ethTxGossipProtocol, p2p.WithValidatorSampling(vm.validators)) - ethTxGossiper, err = gossip.NewPullGossiper[GossipEthTx, *GossipEthTx]( - ethTxGossipConfig, - vm.ctx.Log, - ethTxPool, - ethTxGossipClient, - vm.sdkMetrics, - ) - if err != nil { + if err := vm.Network.AddHandler(atomicTxGossipProtocol, vm.atomicTxGossipHandler); err != nil { return err } - ethTxGossiper = gossip.ValidatorGossiper{ - Gossiper: ethTxGossiper, - NodeID: vm.ctx.NodeID, - Validators: vm.validators, + + if vm.ethTxGossiper == nil { + ethTxGossipClient := vm.Network.NewClient(ethTxGossipProtocol, p2p.WithValidatorSampling(vm.validators)) + ethTxGossiper, err := gossip.NewPullGossiper[GossipEthTx, *GossipEthTx]( + ethTxGossipConfig, + vm.ctx.Log, + ethTxPool, + ethTxGossipClient, + vm.sdkMetrics, + ) + if err != nil { + return err + } + + vm.ethTxGossiper = &gossip.ValidatorGossiper{ + Gossiper: ethTxGossiper, + NodeID: vm.ctx.NodeID, + Validators: vm.validators, + } } vm.shutdownWg.Add(1) go func() { - gossip.Every(ctx, vm.ctx.Log, ethTxGossiper, gossipFrequency) + gossip.Every(ctx, vm.ctx.Log, vm.ethTxGossiper, gossipFrequency) vm.shutdownWg.Done() }() - atomicTxGossipClient := vm.Network.NewClient(atomicTxGossipProtocol, p2p.WithValidatorSampling(vm.validators)) - atomicTxGossiper, err = gossip.NewPullGossiper[GossipAtomicTx, *GossipAtomicTx]( - atomicTxGossipConfig, - vm.ctx.Log, - vm.mempool, - atomicTxGossipClient, - vm.sdkMetrics, - ) - if err != nil { - return err - } - atomicTxGossiper = gossip.ValidatorGossiper{ - Gossiper: atomicTxGossiper, - NodeID: vm.ctx.NodeID, - Validators: vm.validators, + if vm.atomicTxGossiper == nil { + atomicTxGossipClient := vm.Network.NewClient(atomicTxGossipProtocol, p2p.WithValidatorSampling(vm.validators)) + atomicTxGossiper, err := gossip.NewPullGossiper[GossipAtomicTx, *GossipAtomicTx]( + atomicTxGossipConfig, + vm.ctx.Log, + vm.mempool, + atomicTxGossipClient, + vm.sdkMetrics, + ) + if err != nil { + return err + } + + vm.atomicTxGossiper = &gossip.ValidatorGossiper{ + Gossiper: atomicTxGossiper, + NodeID: vm.ctx.NodeID, + Validators: vm.validators, + } } vm.shutdownWg.Add(1) go func() { - gossip.Every(ctx, vm.ctx.Log, atomicTxGossiper, gossipFrequency) + gossip.Every(ctx, vm.ctx.Log, vm.atomicTxGossiper, gossipFrequency) vm.shutdownWg.Done() }() diff --git a/plugin/evm/vm_test.go b/plugin/evm/vm_test.go index 0ba3b17c87..0f4dea63e9 100644 --- a/plugin/evm/vm_test.go +++ b/plugin/evm/vm_test.go @@ -126,6 +126,24 @@ func init() { } } +func newPrefundedGenesis( + balance int, + addresses ...common.Address, +) *core.Genesis { + alloc := core.GenesisAlloc{} + for _, address := range addresses { + alloc[address] = core.GenesisAccount{ + Balance: big.NewInt(int64(balance)), + } + } + + return &core.Genesis{ + Config: params.TestChainConfig, + Difficulty: big.NewInt(0), + Alloc: alloc, + } +} + // BuildGenesisTest returns the genesis bytes for Coreth VM to be used in testing func BuildGenesisTest(t *testing.T, genesisJSON string) []byte { ss := StaticService{}