Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor p2p unit tests #425

Merged
merged 12 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 139 additions & 101 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,90 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/params"
)

func TestEthTxGossip(t *testing.T) {
require := require.New(t)
ctx := context.Background()
snowCtx := snow.DefaultContextTest()
validatorState := &validators.TestState{}
snowCtx.ValidatorState = validatorState

// set up prefunded address
importAmount := uint64(1_000_000_000)
issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONLatest, "", "", map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
})
defer func() {
require.NoError(vm.Shutdown(context.Background()))
}()

txPoolNewHeads := make(chan core.NewTxPoolHeadEvent)
vm.txPool.SubscribeNewHeadEvent(txPoolNewHeads)

importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
pk, err := secp256k1.NewPrivateKey()
require.NoError(err)
require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer

blk, err := vm.BuildBlock(context.Background())
address := GetEthAddress(pk)
genesis := newPrefundedGenesis(100_000_000_000_000_000, address)
genesisBytes, err := genesis.MarshalJSON()
require.NoError(err)

require.NoError(blk.Verify(context.Background()))
require.NoError(vm.SetPreference(context.Background(), blk.ID()))
require.NoError(blk.Accept(context.Background()))
<-txPoolNewHeads
responseSender := &common.FakeSender{
SentAppResponse: make(chan []byte, 1),
}
vm := &VM{
p2pSender: responseSender,
atomicTxGossipHandler: &p2p.NoOpHandler{},
atomicTxGossiper: &gossip.NoOpGossiper{},
}

require.NoError(vm.Initialize(
ctx,
snowCtx,
memdb.New(),
genesisBytes,
nil,
nil,
make(chan common.Message),
nil,
&common.SenderTest{},
))
require.NoError(vm.SetState(ctx, snow.NormalOp))

defer func() {
require.NoError(vm.Shutdown(ctx))
}()

// sender for the peer requesting gossip from [vm]
peerSender := &common.SenderTest{}
peerSender := &common.FakeSender{
SentAppRequest: make(chan []byte, 1),
}

network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(ethTxGossipProtocol)

// we only accept gossip requests from validators
requestingNodeID := ids.GenerateTestNodeID()
require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil))
validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
require.NoError(err)
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
Expand All @@ -77,34 +107,6 @@ func TestEthTxGossip(t *testing.T) {
require.NoError(err)

wg := &sync.WaitGroup{}

requestingNodeID := ids.GenerateTestNodeID()
peerSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error {
go func() {
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes))
}()
return nil
}

sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
go func() {
require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes))
}()
return nil
}

// we only accept gossip requests from validators
require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil))
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState)
require.True(ok)
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
wg.Add(1)
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)
Expand All @@ -114,14 +116,14 @@ func TestEthTxGossip(t *testing.T) {
require.Empty(response.Gossip)
wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse))
wg.Wait()

// Issue a tx to the VM
address := testEthAddrs[0]
key := testKeys[0].ToECDSA()
tx := types.NewTransaction(0, address, big.NewInt(10), 100_000, big.NewInt(params.LaunchMinGasPrice), nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), key)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), pk.ToECDSA())
require.NoError(err)

errs := vm.txPool.AddLocals([]*types.Transaction{signedTx})
Expand All @@ -146,68 +148,92 @@ func TestEthTxGossip(t *testing.T) {

wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse))
wg.Wait()
}

func TestAtomicTxGossip(t *testing.T) {
require := require.New(t)
ctx := context.Background()
snowCtx := snow.DefaultContextTest()
snowCtx.AVAXAssetID = ids.GenerateTestID()
snowCtx.XChainID = ids.GenerateTestID()
validatorState := &validators.TestState{
GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) {
return ids.Empty, nil
},
}
snowCtx.ValidatorState = validatorState
memory := atomic.NewMemory(memdb.New())
snowCtx.SharedMemory = memory.NewSharedMemory(ids.Empty)

// set up prefunded address
importAmount := uint64(1_000_000_000)
issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase0, "", "", map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
})
pk, err := secp256k1.NewPrivateKey()
require.NoError(err)
address := GetEthAddress(pk)
genesis := newPrefundedGenesis(100_000_000_000_000_000, address)
genesisBytes, err := genesis.MarshalJSON()
require.NoError(err)

responseSender := &common.FakeSender{
SentAppResponse: make(chan []byte, 1),
}
vm := &VM{
p2pSender: responseSender,
ethTxGossipHandler: &p2p.NoOpHandler{},
ethTxGossiper: &gossip.NoOpGossiper{},
}

require.NoError(vm.Initialize(
ctx,
snowCtx,
memdb.New(),
genesisBytes,
nil,
nil,
make(chan common.Message),
nil,
&common.SenderTest{},
))
require.NoError(vm.SetState(ctx, snow.NormalOp))

defer func() {
require.NoError(vm.Shutdown(context.Background()))
require.NoError(vm.Shutdown(ctx))
}()

// sender for the peer requesting gossip from [vm]
peerSender := &common.SenderTest{}
peerSender := &common.FakeSender{
SentAppRequest: make(chan []byte, 1),
}
network, err := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(atomicTxGossipProtocol)

// we only accept gossip requests from validators
requestingNodeID := ids.GenerateTestNodeID()
require.NoError(vm.Network.Connected(ctx, requestingNodeID, nil))
validatorState.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
validatorState.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
require.NoError(err)
bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
require.NoError(err)
request := &sdk.PullGossipRequest{
Filter: bloomBytes,
Salt: emptyBloomFilter.Salt[:],
Filter: emptyBloomFilterBytes,
Salt: utils.RandomBytes(32),
}

requestBytes, err := proto.Marshal(request)
require.NoError(err)

requestingNodeID := ids.GenerateTestNodeID()
wg := &sync.WaitGroup{}
peerSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error {
go func() {
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes))
}()
return nil
}

sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
go func() {
require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes))
}()
return nil
}

// we only accept gossip requests from validators
require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil))
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState)
require.True(ok)
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) {
return 0, nil
}
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) {
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil
}

// Ask the VM for any new transactions. We should get nothing at first.
wg.Add(1)
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)
Expand All @@ -217,15 +243,25 @@ func TestAtomicTxGossip(t *testing.T) {
require.Empty(response.Gossip)
wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 1, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 1, <-responseSender.SentAppResponse))
wg.Wait()

// issue a new tx to the vm
importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
// Issue a tx to the VM
utxo, err := addUTXO(
memory,
snowCtx,
ids.GenerateTestID(),
0,
snowCtx.AVAXAssetID,
100_000_000_000,
pk.PublicKey().Address(),
)
require.NoError(err)

require.NoError(vm.mempool.AddLocalTx(importTx))
<-issuer
tx, err := vm.newImportTxWithUTXOs(vm.ctx.XChainID, address, initialBaseFee, secp256k1fx.NewKeychain(pk), []*avax.UTXO{utxo})
require.NoError(err)
require.NoError(vm.mempool.AddLocalTx(tx))

// wait so we aren't throttled by the vm
time.Sleep(5 * time.Second)
Expand All @@ -241,10 +277,12 @@ func TestAtomicTxGossip(t *testing.T) {

gotTx := &GossipAtomicTx{}
require.NoError(gotTx.Unmarshal(response.Gossip[0]))
require.Equal(importTx.InputUTXOs(), gotTx.Tx.InputUTXOs())
require.Equal(tx.ID(), gotTx.GetID())

wg.Done()
}
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse))
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
require.NoError(vm.AppRequest(ctx, requestingNodeID, 3, time.Time{}, <-peerSender.SentAppRequest))
require.NoError(network.AppResponse(ctx, snowCtx.NodeID, 3, <-responseSender.SentAppResponse))
wg.Wait()
}
Loading