Skip to content

Commit

Permalink
Refactor p2p unit tests (#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Dec 13, 2023
1 parent 056800c commit 9c85de5
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 166 deletions.
240 changes: 139 additions & 101 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,90 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/params"
)

func TestEthTxGossip(t *testing.T) {
require := require.New(t)
ctx := context.Background()
snowCtx := snow.DefaultContextTest()
validatorState := &validators.TestState{}
snowCtx.ValidatorState = validatorState

// set up prefunded address
importAmount := uint64(1_000_000_000)
issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONLatest, "", "", map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
})
defer func() {
require.NoError(vm.Shutdown(context.Background()))
}()

txPoolNewHeads := make(chan core.NewTxPoolHeadEvent)
vm.txPool.SubscribeNewHeadEvent(txPoolNewHeads)

importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
pk, err := secp256k1.NewPrivateKey()
require.NoError(err)
require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer

blk, err := vm.BuildBlock(context.Background())
address := GetEthAddress(pk)
genesis := newPrefundedGenesis(100_000_000_000_000_000, address)
genesisBytes, err := genesis.MarshalJSON()
require.NoError(err)

require.NoError(blk.Verify(context.Background()))
require.NoError(vm.SetPreference(context.Background(), blk.ID()))
require.NoError(blk.Accept(context.Background()))
<-txPoolNewHeads
responseSender := &common.FakeSender{
SentAppResponse: make(chan []byte, 1),
}
vm := &VM{
p2pSender: responseSender,
atomicTxGossipHandler: &p2p.NoOpHandler{},
atomicTxGossiper: &gossip.NoOpGossiper{},
}

require.NoError(vm.Initialize(
ctx,
snowCtx,
memdb.New(),
genesisBytes,
nil,
nil,
make(chan common.Message),
nil,
&common.SenderTest{},
))
require.NoError(vm.SetState(ctx, snow.NormalOp))

defer func() {
require.NoError(vm.Shutdown(ctx))
}()

// sender for the peer requesting gossip from [vm]
peerSender := &common.SenderTest{}
peerSender := &common.FakeSender{
SentAppRequest: make(chan []byte, 1),
}

network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(ethTxGossipProtocol)

// we only accept gossip requests from validators
requestingNodeID := ids.GenerateTestNodeID()
require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil))
validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
require.NoError(err)
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
Expand All @@ -77,34 +107,6 @@ func TestEthTxGossip(t *testing.T) {
require.NoError(err)

wg := &sync.WaitGroup{}

requestingNodeID := ids.GenerateTestNodeID()
peerSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error {
go func() {
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes))
}()
return nil
}

sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
go func() {
require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes))
}()
return nil
}

// we only accept gossip requests from validators
require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil))
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState)
require.True(ok)
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
wg.Add(1)
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)
Expand All @@ -114,14 +116,14 @@ func TestEthTxGossip(t *testing.T) {
require.Empty(response.Gossip)
wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse))
wg.Wait()

// Issue a tx to the VM
address := testEthAddrs[0]
key := testKeys[0].ToECDSA()
tx := types.NewTransaction(0, address, big.NewInt(10), 100_000, big.NewInt(params.LaunchMinGasPrice), nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), key)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), pk.ToECDSA())
require.NoError(err)

errs := vm.txPool.AddLocals([]*types.Transaction{signedTx})
Expand All @@ -146,68 +148,92 @@ func TestEthTxGossip(t *testing.T) {

wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse))
wg.Wait()
}

func TestAtomicTxGossip(t *testing.T) {
require := require.New(t)
ctx := context.Background()
snowCtx := snow.DefaultContextTest()
snowCtx.AVAXAssetID = ids.GenerateTestID()
snowCtx.XChainID = ids.GenerateTestID()
validatorState := &validators.TestState{
GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) {
return ids.Empty, nil
},
}
snowCtx.ValidatorState = validatorState
memory := atomic.NewMemory(memdb.New())
snowCtx.SharedMemory = memory.NewSharedMemory(ids.Empty)

// set up prefunded address
importAmount := uint64(1_000_000_000)
issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase0, "", "", map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
})
pk, err := secp256k1.NewPrivateKey()
require.NoError(err)
address := GetEthAddress(pk)
genesis := newPrefundedGenesis(100_000_000_000_000_000, address)
genesisBytes, err := genesis.MarshalJSON()
require.NoError(err)

responseSender := &common.FakeSender{
SentAppResponse: make(chan []byte, 1),
}
vm := &VM{
p2pSender: responseSender,
ethTxGossipHandler: &p2p.NoOpHandler{},
ethTxGossiper: &gossip.NoOpGossiper{},
}

require.NoError(vm.Initialize(
ctx,
snowCtx,
memdb.New(),
genesisBytes,
nil,
nil,
make(chan common.Message),
nil,
&common.SenderTest{},
))
require.NoError(vm.SetState(ctx, snow.NormalOp))

defer func() {
require.NoError(vm.Shutdown(context.Background()))
require.NoError(vm.Shutdown(ctx))
}()

// sender for the peer requesting gossip from [vm]
peerSender := &common.SenderTest{}
peerSender := &common.FakeSender{
SentAppRequest: make(chan []byte, 1),
}
network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(atomicTxGossipProtocol)

// we only accept gossip requests from validators
requestingNodeID := ids.GenerateTestNodeID()
require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil))
validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
require.NoError(err)
bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
require.NoError(err)
request := &sdk.PullGossipRequest{
Filter: bloomBytes,
Salt: emptyBloomFilter.Salt[:],
Filter: emptyBloomFilterBytes,
Salt: utils.RandomBytes(32),
}

requestBytes, err := proto.Marshal(request)
require.NoError(err)

requestingNodeID := ids.GenerateTestNodeID()
wg := &sync.WaitGroup{}
peerSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error {
go func() {
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes))
}()
return nil
}

sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
go func() {
require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes))
}()
return nil
}

// we only accept gossip requests from validators
require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil))
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState)
require.True(ok)
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
wg.Add(1)
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)
Expand All @@ -217,15 +243,25 @@ func TestAtomicTxGossip(t *testing.T) {
require.Empty(response.Gossip)
wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse))
wg.Wait()

// issue a new tx to the vm
importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
// Issue a tx to the VM
utxo, err := addUTXO(
memory,
snowCtx,
ids.GenerateTestID(),
0,
snowCtx.AVAXAssetID,
100_000_000_000,
pk.PublicKey().Address(),
)
require.NoError(err)

require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer
tx, err := vm.newImportTxWithUTXOs(vm.ctx.XChainID, address, initialBaseFee, secp256k1fx.NewKeychain(pk), []*avax.UTXO{utxo})
require.NoError(err)
require.NoError(vm.mempool.AddLocalTx(tx))

// wait so we aren't throttled by the vm
time.Sleep(5 * time.Second)
Expand All @@ -241,10 +277,12 @@ func TestAtomicTxGossip(t *testing.T) {

gotTx := &GossipAtomicTx{}
require.NoError(gotTx.Unmarshal(response.Gossip[0]))
require.Equal(importTx.InputUTXOs(), gotTx.Tx.InputUTXOs())
require.Equal(tx.ID(), gotTx.GetID())

wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse))
wg.Wait()
}
Loading

0 comments on commit 9c85de5

Please sign in to comment.