From 5392db34000ba9c291caa82b6200fac48eb7296c Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 21 Sep 2022 17:14:46 +0300 Subject: [PATCH 1/2] moved some integration tests to elrond-go-p2p --- .../peerDisconnecting_test.go | 158 ------------- .../seedersDisconnecting_test.go | 173 -------------- .../kadDht/peerDiscovery_test.go | 218 ------------------ .../p2p/peerDiscovery/messageProcessor.go | 51 ---- .../peerDiscovery/simpleMessageProcessor.go | 36 --- .../p2p/peerDiscovery/testRunnner.go | 78 ------- .../p2p/pubsub/messageProcessor.go | 56 ----- .../p2p/pubsub/peerReceivingMessages_test.go | 187 --------------- integrationTests/p2p/pubsub/unjoin_test.go | 71 ------ integrationTests/testInitializer.go | 39 ---- p2p/p2p.go | 12 - 11 files changed, 1079 deletions(-) delete mode 100644 integrationTests/p2p/peerDisconnecting/peerDisconnecting_test.go delete mode 100644 integrationTests/p2p/peerDisconnecting/seedersDisconnecting_test.go delete mode 100644 integrationTests/p2p/peerDiscovery/kadDht/peerDiscovery_test.go delete mode 100644 integrationTests/p2p/peerDiscovery/messageProcessor.go delete mode 100644 integrationTests/p2p/peerDiscovery/simpleMessageProcessor.go delete mode 100644 integrationTests/p2p/peerDiscovery/testRunnner.go delete mode 100644 integrationTests/p2p/pubsub/messageProcessor.go delete mode 100644 integrationTests/p2p/pubsub/peerReceivingMessages_test.go delete mode 100644 integrationTests/p2p/pubsub/unjoin_test.go diff --git a/integrationTests/p2p/peerDisconnecting/peerDisconnecting_test.go b/integrationTests/p2p/peerDisconnecting/peerDisconnecting_test.go deleted file mode 100644 index 8d531219c2f..00000000000 --- a/integrationTests/p2p/peerDisconnecting/peerDisconnecting_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package peerDisconnecting - -import ( - "fmt" - "testing" - - "github.com/ElrondNetwork/elrond-go/integrationTests" - "github.com/ElrondNetwork/elrond-go/p2p" - p2pConfig "github.com/ElrondNetwork/elrond-go/p2p/config" - "github.com/ElrondNetwork/elrond-go/testscommon" - "github.com/ElrondNetwork/elrond-go/testscommon/p2pmocks" - "github.com/libp2p/go-libp2p-core/peer" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func createDefaultConfig() p2pConfig.P2PConfig { - return p2pConfig.P2PConfig{ - Node: p2pConfig.NodeConfig{}, - KadDhtPeerDiscovery: p2pConfig.KadDhtPeerDiscoveryConfig{ - Enabled: true, - Type: "optimized", - RefreshIntervalInSec: 1, - RoutingTableRefreshIntervalInSec: 1, - ProtocolID: "/erd/kad/1.0.0", - InitialPeerList: nil, - BucketSize: 100, - }, - } -} - -func TestPeerDisconnectionWithOneAdvertiserWithShardingWithLists(t *testing.T) { - p2pCfg := createDefaultConfig() - p2pCfg.Sharding = p2pConfig.ShardingConfig{ - TargetPeerCount: 100, - MaxIntraShardValidators: 40, - MaxCrossShardValidators: 40, - MaxIntraShardObservers: 1, - MaxCrossShardObservers: 1, - MaxSeeders: 1, - Type: p2p.ListsSharder, - AdditionalConnections: p2pConfig.AdditionalConnectionsConfig{ - MaxFullHistoryObservers: 1, - }, - } - p2pCfg.Node.ThresholdMinConnectedPeers = 3 - - testPeerDisconnectionWithOneAdvertiser(t, p2pCfg) -} - -func testPeerDisconnectionWithOneAdvertiser(t *testing.T, p2pConfig p2pConfig.P2PConfig) { - if testing.Short() { - t.Skip("this is not a short test") - } - - numOfPeers := 20 - netw := mocknet.New() - - p2pConfigSeeder := p2pConfig - argSeeder := p2p.ArgsNetworkMessenger{ - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pConfigSeeder, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - Marshalizer: &testscommon.MarshalizerMock{}, - SyncTimer: &testscommon.SyncTimerStub{}, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - // Step 1. Create advertiser - advertiser, err := p2p.NewMockMessenger(argSeeder, netw) - require.Nil(t, err) - p2pConfig.KadDhtPeerDiscovery.InitialPeerList = []string{integrationTests.GetConnectableAddress(advertiser)} - - // Step 2. Create noOfPeers instances of messenger type and call bootstrap - peers := make([]p2p.Messenger, numOfPeers) - for i := 0; i < numOfPeers; i++ { - arg := p2p.ArgsNetworkMessenger{ - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pConfig, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - Marshalizer: &testscommon.MarshalizerMock{}, - SyncTimer: &testscommon.SyncTimerStub{}, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - node, errCreate := p2p.NewMockMessenger(arg, netw) - require.Nil(t, errCreate) - peers[i] = node - } - - // cleanup function that closes all messengers - defer func() { - for i := 0; i < numOfPeers; i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - if advertiser != nil { - _ = advertiser.Close() - } - }() - - // link all peers so they can connect to each other - _ = netw.LinkAll() - - // Step 3. Call bootstrap on all peers - _ = advertiser.Bootstrap() - for _, p := range peers { - _ = p.Bootstrap() - } - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - - // Step 4. Disconnect one peer - disconnectedPeer := peers[5] - fmt.Printf("--- Diconnecting peer: %v ---\n", disconnectedPeer.ID().Pretty()) - _ = netw.UnlinkPeers(getPeerId(advertiser), getPeerId(disconnectedPeer)) - _ = netw.DisconnectPeers(getPeerId(advertiser), getPeerId(disconnectedPeer)) - _ = netw.DisconnectPeers(getPeerId(disconnectedPeer), getPeerId(advertiser)) - for _, p := range peers { - if p != disconnectedPeer { - _ = netw.UnlinkPeers(getPeerId(p), getPeerId(disconnectedPeer)) - _ = netw.DisconnectPeers(getPeerId(p), getPeerId(disconnectedPeer)) - _ = netw.DisconnectPeers(getPeerId(disconnectedPeer), getPeerId(p)) - } - } - for i := 0; i < 5; i++ { - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - } - - // Step 4.1. Test that the peer is disconnected - for _, p := range peers { - if p != disconnectedPeer { - assert.Equal(t, numOfPeers-1, len(p.ConnectedPeers())) - } else { - assert.Equal(t, 0, len(p.ConnectedPeers())) - } - } - - // Step 5. Re-link and test connections - fmt.Println("--- Re-linking ---") - _ = netw.LinkAll() - for i := 0; i < 5; i++ { - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - } - - // Step 5.1. Test that the peer is reconnected - for _, p := range peers { - assert.Equal(t, numOfPeers, len(p.ConnectedPeers())) - } -} - -func getPeerId(netMessenger p2p.Messenger) peer.ID { - return peer.ID(netMessenger.ID().Bytes()) -} diff --git a/integrationTests/p2p/peerDisconnecting/seedersDisconnecting_test.go b/integrationTests/p2p/peerDisconnecting/seedersDisconnecting_test.go deleted file mode 100644 index 955402aec9c..00000000000 --- a/integrationTests/p2p/peerDisconnecting/seedersDisconnecting_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package peerDisconnecting - -import ( - "testing" - - logger "github.com/ElrondNetwork/elrond-go-logger" - "github.com/ElrondNetwork/elrond-go/integrationTests" - "github.com/ElrondNetwork/elrond-go/p2p" - p2pConfig "github.com/ElrondNetwork/elrond-go/p2p/config" - "github.com/ElrondNetwork/elrond-go/testscommon" - "github.com/ElrondNetwork/elrond-go/testscommon/p2pmocks" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var log = logger.GetOrCreate("integrationtests/p2p/peerdisconnecting") - -func TestSeedersDisconnectionWith2AdvertiserAnd3Peers(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - netw := mocknet.New() - p2pCfg := createDefaultConfig() - p2pCfg.KadDhtPeerDiscovery.RefreshIntervalInSec = 1 - - p2pCfg.Sharding = p2pConfig.ShardingConfig{ - TargetPeerCount: 100, - MaxIntraShardValidators: 40, - MaxCrossShardValidators: 40, - MaxIntraShardObservers: 1, - MaxCrossShardObservers: 1, - MaxSeeders: 3, - Type: p2p.ListsSharder, - AdditionalConnections: p2pConfig.AdditionalConnectionsConfig{ - MaxFullHistoryObservers: 0, - }, - } - p2pCfg.Node.ThresholdMinConnectedPeers = 3 - - numOfPeers := 3 - seeders, seedersList := createBootstrappedSeeders(p2pCfg, 2, netw) - - integrationTests.WaitForBootstrapAndShowConnected(seeders, integrationTests.P2pBootstrapDelay) - - // Step 2. Create noOfPeers instances of messenger type and call bootstrap - p2pCfg.KadDhtPeerDiscovery.InitialPeerList = seedersList - peers := make([]p2p.Messenger, numOfPeers) - for i := 0; i < numOfPeers; i++ { - arg := p2p.ArgsNetworkMessenger{ - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pCfg, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - Marshalizer: &testscommon.MarshalizerMock{}, - SyncTimer: &testscommon.SyncTimerStub{}, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - node, err := p2p.NewMockMessenger(arg, netw) - require.Nil(t, err) - peers[i] = node - } - - // cleanup function that closes all messengers - defer func() { - for i := 0; i < numOfPeers; i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - for i := 0; i < len(seeders); i++ { - if seeders[i] != nil { - _ = seeders[i].Close() - } - } - }() - - // link all peers so they can connect to each other - _ = netw.LinkAll() - - // Step 3. Call bootstrap on all peers - for _, p := range peers { - _ = p.Bootstrap() - } - integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) - - // Step 4. Disconnect the seeders - log.Info("--- Disconnecting seeders: %v ---\n", seeders) - disconnectSeedersFromPeers(seeders, peers, netw) - - for i := 0; i < 2; i++ { - integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) - } - - // Step 4.1. Test that the peers are disconnected - for _, p := range peers { - assert.Equal(t, numOfPeers-1, len(p.ConnectedPeers())) - } - - for _, s := range seeders { - assert.Equal(t, len(seeders)-1, len(s.ConnectedPeers())) - } - - // Step 5. Re-link and test connections - log.Info("--- Re-linking ---") - _ = netw.LinkAll() - for i := 0; i < 2; i++ { - integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) - } - - // Step 5.1. Test that the peers got reconnected - for _, p := range append(peers, seeders...) { - assert.Equal(t, numOfPeers+len(seeders)-1, len(p.ConnectedPeers())) - } -} - -func createBootstrappedSeeders(baseP2PConfig p2pConfig.P2PConfig, numSeeders int, netw mocknet.Mocknet) ([]p2p.Messenger, []string) { - seeders := make([]p2p.Messenger, numSeeders) - seedersAddresses := make([]string, numSeeders) - - p2pConfigSeeder := baseP2PConfig - argSeeder := p2p.ArgsNetworkMessenger{ - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pConfigSeeder, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - Marshalizer: &testscommon.MarshalizerMock{}, - SyncTimer: &testscommon.SyncTimerStub{}, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - seeders[0], _ = p2p.NewMockMessenger(argSeeder, netw) - _ = seeders[0].Bootstrap() - seedersAddresses[0] = integrationTests.GetConnectableAddress(seeders[0]) - - for i := 1; i < numSeeders; i++ { - p2pConfigSeeder = baseP2PConfig - p2pConfigSeeder.KadDhtPeerDiscovery.InitialPeerList = []string{integrationTests.GetConnectableAddress(seeders[0])} - argSeeder = p2p.ArgsNetworkMessenger{ - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pConfigSeeder, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - Marshalizer: &testscommon.MarshalizerMock{}, - SyncTimer: &testscommon.SyncTimerStub{}, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - seeders[i], _ = p2p.NewMockMessenger(argSeeder, netw) - _ = netw.LinkAll() - _ = seeders[i].Bootstrap() - seedersAddresses[i] = integrationTests.GetConnectableAddress(seeders[i]) - } - - return seeders, seedersAddresses -} - -func disconnectSeedersFromPeers(seeders []p2p.Messenger, peers []p2p.Messenger, netw mocknet.Mocknet) { - for _, p := range peers { - for _, s := range seeders { - disconnectPeers(p, s, netw) - } - } -} - -func disconnectPeers(peer1 p2p.Messenger, peer2 p2p.Messenger, netw mocknet.Mocknet) { - _ = netw.UnlinkPeers(getPeerId(peer1), getPeerId(peer2)) - _ = netw.DisconnectPeers(getPeerId(peer1), getPeerId(peer2)) - _ = netw.DisconnectPeers(getPeerId(peer2), getPeerId(peer1)) -} diff --git a/integrationTests/p2p/peerDiscovery/kadDht/peerDiscovery_test.go b/integrationTests/p2p/peerDiscovery/kadDht/peerDiscovery_test.go deleted file mode 100644 index f5926aae166..00000000000 --- a/integrationTests/p2p/peerDiscovery/kadDht/peerDiscovery_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package kadDht - -import ( - "fmt" - "testing" - "time" - - "github.com/ElrondNetwork/elrond-go/integrationTests" - "github.com/ElrondNetwork/elrond-go/integrationTests/p2p/peerDiscovery" - "github.com/ElrondNetwork/elrond-go/p2p" - "github.com/stretchr/testify/assert" -) - -var durationTopicAnnounceTime = 2 * time.Second - -func TestPeerDiscoveryAndMessageSendingWithOneAdvertiser(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - numOfPeers := 20 - - //Step 1. Create advertiser - advertiser := integrationTests.CreateMessengerWithKadDht("") - _ = advertiser.Bootstrap() - - //Step 2. Create numOfPeers instances of messenger type and call bootstrap - peers := make([]p2p.Messenger, numOfPeers) - - for i := 0; i < numOfPeers; i++ { - peers[i] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertiser)) - - _ = peers[i].Bootstrap() - } - - //cleanup function that closes all messengers - defer func() { - for i := 0; i < numOfPeers; i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - if advertiser != nil { - _ = advertiser.Close() - } - }() - - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - - //Step 3. Create a test topic, add receiving handlers - createTestTopicAndWaitForAnnouncements(t, peers) - - //Step 4. run the test for a couple of times as peer discovering and topic announcing - // are not deterministic nor instant processes - - numOfTests := 5 - for i := 0; i < numOfTests; i++ { - testResult := peerDiscovery.RunTest(peers, i, "test topic") - - if testResult { - return - } - } - - assert.Fail(t, "test failed. Discovery/message passing are not validated") -} - -func TestPeerDiscoveryAndMessageSendingWithThreeAdvertisers(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - numOfPeers := 20 - numOfAdvertisers := 3 - - //Step 1. Create 3 advertisers and connect them together - advertisers := make([]p2p.Messenger, numOfAdvertisers) - advertisers[0] = integrationTests.CreateMessengerWithKadDht("") - _ = advertisers[0].Bootstrap() - - for idx := 1; idx < numOfAdvertisers; idx++ { - advertisers[idx] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertisers[0])) - _ = advertisers[idx].Bootstrap() - } - - //Step 2. Create numOfPeers instances of messenger type and call bootstrap - peers := make([]p2p.Messenger, numOfPeers) - - for i := 0; i < numOfPeers; i++ { - peers[i] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertisers[i%numOfAdvertisers])) - _ = peers[i].Bootstrap() - } - - //cleanup function that closes all messengers - defer func() { - for i := 0; i < numOfPeers; i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - for i := 0; i < numOfAdvertisers; i++ { - if advertisers[i] != nil { - _ = advertisers[i].Close() - } - } - }() - - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - - //Step 3. Create a test topic, add receiving handlers - createTestTopicAndWaitForAnnouncements(t, peers) - - //Step 4. run the test for a couple of times as peer discovering and topic announcing - // are not deterministic nor instant processes - - noOfTests := 5 - for i := 0; i < noOfTests; i++ { - testResult := peerDiscovery.RunTest(peers, i, "test topic") - - if testResult { - return - } - } - - assert.Fail(t, "test failed. Discovery/message passing are not validated") -} - -func TestPeerDiscoveryAndMessageSendingWithOneAdvertiserAndProtocolID(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - advertiser := integrationTests.CreateMessengerWithKadDht("") - _ = advertiser.Bootstrap() - - protocolID1 := "/erd/kad/1.0.0" - protocolID2 := "/amony/kad/0.0.0" - - peer1 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( - integrationTests.GetConnectableAddress(advertiser), - protocolID1, - ) - peer2 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( - integrationTests.GetConnectableAddress(advertiser), - protocolID1, - ) - peer3 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( - integrationTests.GetConnectableAddress(advertiser), - protocolID2, - ) - - peers := []p2p.Messenger{peer1, peer2, peer3} - - for _, peer := range peers { - _ = peer.Bootstrap() - } - - //cleanup function that closes all messengers - defer func() { - for i := 0; i < len(peers); i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - if advertiser != nil { - _ = advertiser.Close() - } - }() - - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - - createTestTopicAndWaitForAnnouncements(t, peers) - - topic := "test topic" - message := []byte("message") - messageProcessors := assignProcessors(peers, topic) - - peer1.Broadcast(topic, message) - time.Sleep(time.Second * 2) - - assert.Equal(t, message, messageProcessors[0].GetLastMessage()) - assert.Equal(t, message, messageProcessors[1].GetLastMessage()) - assert.Nil(t, messageProcessors[2].GetLastMessage()) - - assert.Equal(t, 2, len(peer1.ConnectedPeers())) - assert.Equal(t, 2, len(peer2.ConnectedPeers())) - assert.Equal(t, 1, len(peer3.ConnectedPeers())) -} - -func assignProcessors(peers []p2p.Messenger, topic string) []*peerDiscovery.SimpleMessageProcessor { - processors := make([]*peerDiscovery.SimpleMessageProcessor, 0, len(peers)) - for _, peer := range peers { - proc := &peerDiscovery.SimpleMessageProcessor{} - processors = append(processors, proc) - - err := peer.RegisterMessageProcessor(topic, "test", proc) - if err != nil { - fmt.Println(err.Error()) - } - } - - return processors -} - -func createTestTopicAndWaitForAnnouncements(t *testing.T, peers []p2p.Messenger) { - for _, peer := range peers { - err := peer.CreateTopic("test topic", true) - if err != nil { - assert.Fail(t, "test fail while creating topic") - } - } - - fmt.Printf("Waiting %v for topic announcement...\n", durationTopicAnnounceTime) - time.Sleep(durationTopicAnnounceTime) -} diff --git a/integrationTests/p2p/peerDiscovery/messageProcessor.go b/integrationTests/p2p/peerDiscovery/messageProcessor.go deleted file mode 100644 index a9e8f342b04..00000000000 --- a/integrationTests/p2p/peerDiscovery/messageProcessor.go +++ /dev/null @@ -1,51 +0,0 @@ -package peerDiscovery - -import ( - "bytes" - "sync" - - "github.com/ElrondNetwork/elrond-go-core/core" - "github.com/ElrondNetwork/elrond-go/p2p" -) - -// MessageProcesssor - -type MessageProcesssor struct { - RequiredValue []byte - chanDone chan struct{} - mutDataReceived sync.Mutex - wasDataReceived bool -} - -// NewMessageProcessor - -func NewMessageProcessor(chanDone chan struct{}, requiredVal []byte) *MessageProcesssor { - return &MessageProcesssor{ - RequiredValue: requiredVal, - chanDone: chanDone, - } -} - -// ProcessReceivedMessage - -func (mp *MessageProcesssor) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { - if bytes.Equal(mp.RequiredValue, message.Data()) { - mp.mutDataReceived.Lock() - mp.wasDataReceived = true - mp.mutDataReceived.Unlock() - - mp.chanDone <- struct{}{} - } - - return nil -} - -// WasDataReceived - -func (mp *MessageProcesssor) WasDataReceived() bool { - mp.mutDataReceived.Lock() - defer mp.mutDataReceived.Unlock() - - return mp.wasDataReceived -} - -// IsInterfaceNil returns true if there is no value under the interface -func (mp *MessageProcesssor) IsInterfaceNil() bool { - return mp == nil -} diff --git a/integrationTests/p2p/peerDiscovery/simpleMessageProcessor.go b/integrationTests/p2p/peerDiscovery/simpleMessageProcessor.go deleted file mode 100644 index fe54584e675..00000000000 --- a/integrationTests/p2p/peerDiscovery/simpleMessageProcessor.go +++ /dev/null @@ -1,36 +0,0 @@ -package peerDiscovery - -import ( - "sync" - - "github.com/ElrondNetwork/elrond-go-core/core" - "github.com/ElrondNetwork/elrond-go/p2p" -) - -// SimpleMessageProcessor records the last received message -type SimpleMessageProcessor struct { - mutMessage sync.RWMutex - message []byte -} - -// ProcessReceivedMessage records the message -func (smp *SimpleMessageProcessor) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { - smp.mutMessage.Lock() - smp.message = message.Data() - smp.mutMessage.Unlock() - - return nil -} - -// GetLastMessage returns the last message received -func (smp *SimpleMessageProcessor) GetLastMessage() []byte { - smp.mutMessage.RLock() - defer smp.mutMessage.RUnlock() - - return smp.message -} - -// IsInterfaceNil returns true if there is no value under the interface -func (smp *SimpleMessageProcessor) IsInterfaceNil() bool { - return smp == nil -} diff --git a/integrationTests/p2p/peerDiscovery/testRunnner.go b/integrationTests/p2p/peerDiscovery/testRunnner.go deleted file mode 100644 index a75f2b4311d..00000000000 --- a/integrationTests/p2p/peerDiscovery/testRunnner.go +++ /dev/null @@ -1,78 +0,0 @@ -package peerDiscovery - -import ( - "fmt" - "strconv" - "sync/atomic" - "time" - - "github.com/ElrondNetwork/elrond-go/p2p" -) - -var durationMsgReceived = 2 * time.Second - -// RunTest will test if all the peers receive a message -func RunTest(peers []p2p.Messenger, testIndex int, topic string) bool { - fmt.Printf("Running test %v\n", testIndex) - - testMessage := "test " + strconv.Itoa(testIndex) - messageProcessors := make([]*MessageProcesssor, len(peers)) - - chanDone := make(chan struct{}) - chanMessageProcessor := make(chan struct{}, len(peers)) - - //add a new message processor for each messenger - for i, peer := range peers { - mp := NewMessageProcessor(chanMessageProcessor, []byte(testMessage)) - - messageProcessors[i] = mp - err := peer.RegisterMessageProcessor(topic, "test", mp) - if err != nil { - fmt.Println(err.Error()) - return false - } - } - - var msgReceived int32 = 0 - - go func() { - - for { - <-chanMessageProcessor - - completelyRecv := true - - atomic.StoreInt32(&msgReceived, 0) - - //to be 100% all peers received the messages, iterate all message processors and check received flag - for _, mp := range messageProcessors { - if !mp.WasDataReceived() { - completelyRecv = false - continue - } - - atomic.AddInt32(&msgReceived, 1) - } - - if !completelyRecv { - continue - } - - //all messengers got the message - chanDone <- struct{}{} - return - } - }() - - //write the message on topic - peers[0].Broadcast(topic, []byte(testMessage)) - - select { - case <-chanDone: - return true - case <-time.After(durationMsgReceived): - fmt.Printf("timeout fetching all messages. Got %d from %d\n", - atomic.LoadInt32(&msgReceived), len(peers)) - return false - } -} diff --git a/integrationTests/p2p/pubsub/messageProcessor.go b/integrationTests/p2p/pubsub/messageProcessor.go deleted file mode 100644 index b27bd690ad3..00000000000 --- a/integrationTests/p2p/pubsub/messageProcessor.go +++ /dev/null @@ -1,56 +0,0 @@ -package peerDisconnecting - -import ( - "sync" - - "github.com/ElrondNetwork/elrond-go-core/core" - "github.com/ElrondNetwork/elrond-go/p2p" -) - -type messageProcessor struct { - mutMessages sync.Mutex - messages map[core.PeerID][]p2p.MessageP2P -} - -func newMessageProcessor() *messageProcessor { - return &messageProcessor{ - messages: make(map[core.PeerID][]p2p.MessageP2P), - } -} - -// ProcessReceivedMessage - -func (mp *messageProcessor) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { - mp.mutMessages.Lock() - defer mp.mutMessages.Unlock() - - mp.messages[fromConnectedPeer] = append(mp.messages[fromConnectedPeer], message) - - return nil -} - -// Messages - -func (mp *messageProcessor) Messages(pid core.PeerID) []p2p.MessageP2P { - mp.mutMessages.Lock() - defer mp.mutMessages.Unlock() - - return mp.messages[pid] -} - -// AllMessages - -func (mp *messageProcessor) AllMessages() []p2p.MessageP2P { - result := make([]p2p.MessageP2P, 0) - - mp.mutMessages.Lock() - defer mp.mutMessages.Unlock() - - for _, messages := range mp.messages { - result = append(result, messages...) - } - - return result -} - -// IsInterfaceNil - -func (mp *messageProcessor) IsInterfaceNil() bool { - return mp == nil -} diff --git a/integrationTests/p2p/pubsub/peerReceivingMessages_test.go b/integrationTests/p2p/pubsub/peerReceivingMessages_test.go deleted file mode 100644 index c93ea3dce11..00000000000 --- a/integrationTests/p2p/pubsub/peerReceivingMessages_test.go +++ /dev/null @@ -1,187 +0,0 @@ -package peerDisconnecting - -import ( - "encoding/hex" - "fmt" - "sync" - "testing" - "time" - - "github.com/ElrondNetwork/elrond-go-core/core" - "github.com/ElrondNetwork/elrond-go/integrationTests" - "github.com/ElrondNetwork/elrond-go/p2p" - "github.com/stretchr/testify/assert" -) - -var durationTest = 30 * time.Second - -type messageProcessorStub struct { - ProcessReceivedMessageCalled func(message p2p.MessageP2P) error -} - -// ProcessReceivedMessage - -func (mps *messageProcessorStub) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { - return mps.ProcessReceivedMessageCalled(message) -} - -// IsInterfaceNil returns true if there is no value under the interface -func (mps *messageProcessorStub) IsInterfaceNil() bool { - return mps == nil -} - -func TestPeerReceivesTheSameMessageMultipleTimesShouldNotHappen(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - numOfPeers := 20 - - //Step 1. Create advertiser - advertiser := integrationTests.CreateMessengerWithKadDht("") - - //Step 2. Create numOfPeers instances of messenger type and call bootstrap - peers := make([]p2p.Messenger, numOfPeers) - for i := 0; i < numOfPeers; i++ { - node := integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertiser)) - peers[i] = node - } - - //cleanup function that closes all messengers - defer func() { - for i := 0; i < numOfPeers; i++ { - if peers[i] != nil { - _ = peers[i].Close() - } - } - - if advertiser != nil { - _ = advertiser.Close() - } - }() - - chanStop := make(chan struct{}) - - //Step 3. Register pubsub validators - mutMapMessages := sync.Mutex{} - mapMessages := make(map[int]map[string]struct{}) - testTopic := "test" - - for i := 0; i < numOfPeers; i++ { - idx := i - mapMessages[idx] = make(map[string]struct{}) - err := peers[idx].CreateTopic(testTopic, true) - if err != nil { - fmt.Println("CreateTopic failed:", err.Error()) - continue - } - - err = peers[idx].RegisterMessageProcessor(testTopic, "test", &messageProcessorStub{ - ProcessReceivedMessageCalled: func(message p2p.MessageP2P) error { - time.Sleep(time.Second) - - mutMapMessages.Lock() - defer mutMapMessages.Unlock() - - msgId := "peer: " + message.Peer().Pretty() + " - seqNo: 0x" + hex.EncodeToString(message.SeqNo()) - _, ok := mapMessages[idx][msgId] - if ok { - assert.Fail(t, "message %s received twice", msgId) - chanStop <- struct{}{} - } - - mapMessages[idx][msgId] = struct{}{} - return nil - }, - }) - if err != nil { - fmt.Println("RegisterMessageProcessor:", err.Error()) - } - } - - //Step 4. Call bootstrap on all peers - err := advertiser.Bootstrap() - if err != nil { - fmt.Println("Bootstrap failed:", err.Error()) - } - for _, p := range peers { - err = p.Bootstrap() - if err != nil { - fmt.Printf("Bootstrap() for peer id %s failed:%s\n", p.ID(), err.Error()) - } - } - integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) - - //Step 5. Continuously send messages from one peer - for timeStart := time.Now(); timeStart.Add(durationTest).Unix() > time.Now().Unix(); { - peers[0].Broadcast(testTopic, []byte("test buff")) - select { - case <-chanStop: - return - default: - } - time.Sleep(time.Millisecond) - } -} - -// TestBroadcastMessageComesFormTheConnectedPeers tests what happens in a network when a message comes through pubsub -// The receiving peer should get the message only from one of the connected peers -func TestBroadcastMessageComesFormTheConnectedPeers(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - topic := "test_topic" - broadcastMessageDuration := time.Second * 2 - peers, err := integrationTests.CreateFixedNetworkOf8Peers() - assert.Nil(t, err) - - defer func() { - integrationTests.ClosePeers(peers) - }() - - //node 0 is connected only to 1 and 3 (check integrationTests.CreateFixedNetworkOf7Peers function) - //a broadcast message from 6 should be received on node 0 only through peers 1 and 3 - - interceptors, err := createTopicsAndMockInterceptors(peers, topic) - assert.Nil(t, err) - - fmt.Println("bootstrapping nodes") - time.Sleep(integrationTests.P2pBootstrapDelay) - - broadcastIdx := 6 - receiverIdx := 0 - shouldReceiveFrom := []int{1, 3} - - broadcastPeer := peers[broadcastIdx] - fmt.Printf("broadcasting message from pid %s\n", broadcastPeer.ID().Pretty()) - broadcastPeer.Broadcast(topic, []byte("dummy")) - time.Sleep(broadcastMessageDuration) - - countReceivedMessages := 0 - receiverInterceptor := interceptors[receiverIdx] - for _, idx := range shouldReceiveFrom { - connectedPid := peers[idx].ID() - countReceivedMessages += len(receiverInterceptor.Messages(connectedPid)) - } - - assert.Equal(t, 1, countReceivedMessages) -} - -func createTopicsAndMockInterceptors(peers []p2p.Messenger, topic string) ([]*messageProcessor, error) { - interceptors := make([]*messageProcessor, len(peers)) - - for idx, p := range peers { - err := p.CreateTopic(topic, true) - if err != nil { - return nil, fmt.Errorf("%w, pid: %s", err, p.ID()) - } - - interceptors[idx] = newMessageProcessor() - err = p.RegisterMessageProcessor(topic, "test", interceptors[idx]) - if err != nil { - return nil, fmt.Errorf("%w, pid: %s", err, p.ID()) - } - } - - return interceptors, nil -} diff --git a/integrationTests/p2p/pubsub/unjoin_test.go b/integrationTests/p2p/pubsub/unjoin_test.go deleted file mode 100644 index a9ca66ed5aa..00000000000 --- a/integrationTests/p2p/pubsub/unjoin_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package peerDisconnecting - -import ( - "fmt" - "testing" - "time" - - "github.com/ElrondNetwork/elrond-go/integrationTests" - "github.com/stretchr/testify/assert" -) - -const durationBootstrapping = time.Second * 2 -const durationTraverseNetwork = time.Second * 2 -const durationUnjoin = time.Second * 2 - -func TestPubsubUnjoinShouldWork(t *testing.T) { - if testing.Short() { - t.Skip("this is not a short test") - } - - peers, _ := integrationTests.CreateFixedNetworkOf8Peers() - defer func() { - integrationTests.ClosePeers(peers) - }() - - topic := "test_topic" - processors := make([]*messageProcessor, 0, len(peers)) - for idx, p := range peers { - _ = p.CreateTopic(topic, true) - processors = append(processors, newMessageProcessor()) - _ = p.RegisterMessageProcessor(topic, "test", processors[idx]) - } - - fmt.Println("bootstrapping nodes") - time.Sleep(durationBootstrapping) - - //a message should traverse the network - fmt.Println("sending the message that should traverse the whole network") - sender := peers[4] - sender.Broadcast(topic, []byte("message 1")) - - time.Sleep(durationTraverseNetwork) - - for _, mp := range processors { - assert.Equal(t, 1, len(mp.AllMessages())) - } - - blockedIdxs := []int{3, 6, 2, 5} - //node 3 unjoins the topic, which should prevent the propagation of the messages on peers 3, 6, 2 and 5 - err := peers[3].UnregisterAllMessageProcessors() - assert.Nil(t, err) - - err = peers[3].UnjoinAllTopics() - assert.Nil(t, err) - - time.Sleep(durationUnjoin) - - fmt.Println("sending the message that should traverse half the network") - sender.Broadcast(topic, []byte("message 2")) - - time.Sleep(durationTraverseNetwork) - - for idx, mp := range processors { - if integrationTests.IsIntInSlice(idx, blockedIdxs) { - assert.Equal(t, 1, len(mp.AllMessages())) - continue - } - - assert.Equal(t, 2, len(mp.AllMessages())) - } -} diff --git a/integrationTests/testInitializer.go b/integrationTests/testInitializer.go index d45425c7f56..69076c0f955 100644 --- a/integrationTests/testInitializer.go +++ b/integrationTests/testInitializer.go @@ -164,31 +164,6 @@ func CreateMessengerWithKadDht(initialAddr string) p2p.Messenger { return libP2PMes } -// CreateMessengerWithKadDhtAndProtocolID creates a new libp2p messenger with kad-dht peer discovery and peer ID -func CreateMessengerWithKadDhtAndProtocolID(initialAddr string, protocolID string) p2p.Messenger { - initialAddresses := make([]string, 0) - if len(initialAddr) > 0 { - initialAddresses = append(initialAddresses, initialAddr) - } - p2pCfg := createP2PConfig(initialAddresses) - p2pCfg.KadDhtPeerDiscovery.ProtocolID = protocolID - arg := p2p.ArgsNetworkMessenger{ - Marshalizer: TestMarshalizer, - ListenAddress: p2p.ListenLocalhostAddrWithIp4AndTcp, - P2pConfig: p2pCfg, - SyncTimer: &p2p.LocalSyncTimer{}, - PreferredPeersHolder: &p2pmocks.PeersHolderStub{}, - NodeOperationMode: p2p.NormalOperation, - PeersRatingHandler: &p2pmocks.PeersRatingHandlerStub{}, - ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, - } - - libP2PMes, err := p2p.NewNetworkMessenger(arg) - log.LogIfError(err) - - return libP2PMes -} - // CreateMessengerFromConfig creates a new libp2p messenger with provided configuration func CreateMessengerFromConfig(p2pConfig p2pConfig.P2PConfig) p2p.Messenger { arg := p2p.ArgsNetworkMessenger{ @@ -2164,20 +2139,6 @@ func ProposeAndSyncOneBlock( return round, nonce } -// WaitForBootstrapAndShowConnected will delay a given duration in order to wait for bootstraping and print the -// number of peers that each node is connected to -func WaitForBootstrapAndShowConnected(peers []p2p.Messenger, durationBootstrapingTime time.Duration) { - log.Info("Waiting for peer discovery...", "time", durationBootstrapingTime) - time.Sleep(durationBootstrapingTime) - - strs := []string{"Connected peers:"} - for _, peer := range peers { - strs = append(strs, fmt.Sprintf("Peer %s is connected to %d peers", peer.ID().Pretty(), len(peer.ConnectedPeers()))) - } - - log.Info(strings.Join(strs, "\n")) -} - // PubKeysMapFromKeysMap returns a map of public keys per shard from the key pairs per shard map. func PubKeysMapFromKeysMap(keyPairMap map[uint32][]*TestKeyPair) map[uint32][]string { keysMap := make(map[uint32][]string) diff --git a/p2p/p2p.go b/p2p/p2p.go index 9406a96e633..711152ef680 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -6,7 +6,6 @@ import ( "github.com/ElrondNetwork/elrond-go-p2p/message" "github.com/ElrondNetwork/elrond-go-p2p/peersHolder" "github.com/ElrondNetwork/elrond-go-p2p/rating" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" ) // ArgsNetworkMessenger defines the options used to create a p2p wrapper @@ -17,17 +16,6 @@ func NewNetworkMessenger(args ArgsNetworkMessenger) (p2p.Messenger, error) { return libp2p.NewNetworkMessenger(args) } -// NewMockMessenger creates a new sandbox testable instance of libP2P messenger -// It should not open ports on current machine -// Should be used only in testing! -// TODO: (next PR) remove this and move integration tests to elrond-go-p2p -func NewMockMessenger( - args ArgsNetworkMessenger, - mockNet mocknet.Mocknet, -) (p2p.Messenger, error) { - return libp2p.NewMockMessenger(args, mockNet) -} - // LocalSyncTimer uses the local system to provide the current time type LocalSyncTimer = libp2p.LocalSyncTimer From 5e91f25858eac6eda9ef1620af4ac3ac8396fa8f Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 21 Sep 2022 17:15:18 +0300 Subject: [PATCH 2/2] updated go.mod --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 661ac3a2a10..387b9a9279b 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ElrondNetwork/elrond-go-core v1.1.19 github.com/ElrondNetwork/elrond-go-crypto v1.0.1 github.com/ElrondNetwork/elrond-go-logger v1.0.7 - github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220919155246-aaae1065ad8b + github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220921135804-d7d1df508584 github.com/ElrondNetwork/elrond-go-storage v1.0.1 github.com/ElrondNetwork/elrond-vm-common v1.3.16 github.com/beevik/ntp v0.3.0 @@ -24,8 +24,6 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/gops v0.3.18 github.com/gorilla/websocket v1.5.0 - github.com/libp2p/go-libp2p v0.19.3 - github.com/libp2p/go-libp2p-core v0.15.1 github.com/mitchellh/mapstructure v1.5.0 github.com/pelletier/go-toml v1.9.3 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 5b8c2965fae..365079c297b 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/ElrondNetwork/elrond-go-logger v1.0.4/go.mod h1:e5D+c97lKUfFdAzFX7rrI github.com/ElrondNetwork/elrond-go-logger v1.0.5/go.mod h1:cBfgx0ST/CJx8jrxJSC5aiSrvkGzcnF7sK06RD8mFxQ= github.com/ElrondNetwork/elrond-go-logger v1.0.7 h1:Ldl1rVS0RGKc1IsW8jIaGCb6Zwei04gsMvyjL05X6mE= github.com/ElrondNetwork/elrond-go-logger v1.0.7/go.mod h1:cBfgx0ST/CJx8jrxJSC5aiSrvkGzcnF7sK06RD8mFxQ= -github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220919155246-aaae1065ad8b h1:m4caDchgW0mVtA2M+Yd2P39KtMvAhV8FaCgKC/sCaMA= -github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220919155246-aaae1065ad8b/go.mod h1:Mp0l+8w2tuhJEuyHo0MFAhhqvo/uTgAliFQJ+NIv4qE= +github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220921135804-d7d1df508584 h1:8cPa6OJpH6ALf9CV4oXkwhXxAshANjdf4MfZa3RblFE= +github.com/ElrondNetwork/elrond-go-p2p v0.0.0-20220921135804-d7d1df508584/go.mod h1:Mp0l+8w2tuhJEuyHo0MFAhhqvo/uTgAliFQJ+NIv4qE= github.com/ElrondNetwork/elrond-go-storage v1.0.1 h1:T5pmTAu97aFNbUPpqxJprBEOs+uWsTaJSbCwY9xWPRA= github.com/ElrondNetwork/elrond-go-storage v1.0.1/go.mod h1:Dht8Vt0BJvyUrr+mDSjYo2pu2fIMZfmUa0yznPG9zGw= github.com/ElrondNetwork/elrond-vm-common v1.1.0/go.mod h1:w3i6f8uiuRkE68Ie/gebRcLgTuHqvruJSYrFyZWuLrE=