diff --git a/integrationTests/peerDisconnecting/peerDisconnecting_test.go b/integrationTests/peerDisconnecting/peerDisconnecting_test.go new file mode 100644 index 00000000..ac3bccc4 --- /dev/null +++ b/integrationTests/peerDisconnecting/peerDisconnecting_test.go @@ -0,0 +1,158 @@ +package peerDisconnecting + +import ( + "fmt" + "testing" + + "github.com/ElrondNetwork/elrond-go-p2p" + "github.com/ElrondNetwork/elrond-go-p2p/config" + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests" + "github.com/ElrondNetwork/elrond-go-p2p/libp2p" + "github.com/ElrondNetwork/elrond-go-p2p/mock" + "github.com/libp2p/go-libp2p-core/peer" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func createDefaultConfig() config.P2PConfig { + return config.P2PConfig{ + Node: config.NodeConfig{}, + KadDhtPeerDiscovery: config.KadDhtPeerDiscoveryConfig{ + Enabled: true, + Type: "optimized", + RefreshIntervalInSec: 1, + RoutingTableRefreshIntervalInSec: 1, + ProtocolID: "/erd/kad/1.0.0", + InitialPeerList: nil, + BucketSize: 100, + }, + } +} + +func TestPeerDisconnectionWithOneAdvertiserWithShardingWithLists(t *testing.T) { + p2pCfg := createDefaultConfig() + p2pCfg.Sharding = config.ShardingConfig{ + TargetPeerCount: 100, + MaxIntraShardValidators: 40, + MaxCrossShardValidators: 40, + MaxIntraShardObservers: 1, + MaxCrossShardObservers: 1, + MaxSeeders: 1, + Type: p2p.ListsSharder, + AdditionalConnections: config.AdditionalConnectionsConfig{ + MaxFullHistoryObservers: 1, + }, + } + p2pCfg.Node.ThresholdMinConnectedPeers = 3 + + testPeerDisconnectionWithOneAdvertiser(t, p2pCfg) +} + +func testPeerDisconnectionWithOneAdvertiser(t *testing.T, p2pConfig config.P2PConfig) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numOfPeers := 20 + netw := mocknet.New() + + p2pConfigSeeder := p2pConfig + argSeeder := libp2p.ArgsNetworkMessenger{ + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pConfigSeeder, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + Marshalizer: &mock.MarshallerMock{}, + SyncTimer: &mock.SyncTimerStub{}, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + // Step 1. Create advertiser + advertiser, err := libp2p.NewMockMessenger(argSeeder, netw) + require.Nil(t, err) + p2pConfig.KadDhtPeerDiscovery.InitialPeerList = []string{integrationTests.GetConnectableAddress(advertiser)} + + // Step 2. Create noOfPeers instances of messenger type and call bootstrap + peers := make([]p2p.Messenger, numOfPeers) + for i := 0; i < numOfPeers; i++ { + arg := libp2p.ArgsNetworkMessenger{ + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pConfig, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + Marshalizer: &mock.MarshallerMock{}, + SyncTimer: &mock.SyncTimerStub{}, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + node, errCreate := libp2p.NewMockMessenger(arg, netw) + require.Nil(t, errCreate) + peers[i] = node + } + + // cleanup function that closes all messengers + defer func() { + for i := 0; i < numOfPeers; i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + if advertiser != nil { + _ = advertiser.Close() + } + }() + + // link all peers so they can connect to each other + _ = netw.LinkAll() + + // Step 3. Call bootstrap on all peers + _ = advertiser.Bootstrap() + for _, p := range peers { + _ = p.Bootstrap() + } + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + + // Step 4. Disconnect one peer + disconnectedPeer := peers[5] + fmt.Printf("--- Diconnecting peer: %v ---\n", disconnectedPeer.ID().Pretty()) + _ = netw.UnlinkPeers(getPeerId(advertiser), getPeerId(disconnectedPeer)) + _ = netw.DisconnectPeers(getPeerId(advertiser), getPeerId(disconnectedPeer)) + _ = netw.DisconnectPeers(getPeerId(disconnectedPeer), getPeerId(advertiser)) + for _, p := range peers { + if p != disconnectedPeer { + _ = netw.UnlinkPeers(getPeerId(p), getPeerId(disconnectedPeer)) + _ = netw.DisconnectPeers(getPeerId(p), getPeerId(disconnectedPeer)) + _ = netw.DisconnectPeers(getPeerId(disconnectedPeer), getPeerId(p)) + } + } + for i := 0; i < 5; i++ { + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + } + + // Step 4.1. Test that the peer is disconnected + for _, p := range peers { + if p != disconnectedPeer { + assert.Equal(t, numOfPeers-1, len(p.ConnectedPeers())) + } else { + assert.Equal(t, 0, len(p.ConnectedPeers())) + } + } + + // Step 5. Re-link and test connections + fmt.Println("--- Re-linking ---") + _ = netw.LinkAll() + for i := 0; i < 5; i++ { + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + } + + // Step 5.1. Test that the peer is reconnected + for _, p := range peers { + assert.Equal(t, numOfPeers, len(p.ConnectedPeers())) + } +} + +func getPeerId(netMessenger p2p.Messenger) peer.ID { + return peer.ID(netMessenger.ID().Bytes()) +} diff --git a/integrationTests/peerDisconnecting/seedersDisconnecting_test.go b/integrationTests/peerDisconnecting/seedersDisconnecting_test.go new file mode 100644 index 00000000..8f8d8856 --- /dev/null +++ b/integrationTests/peerDisconnecting/seedersDisconnecting_test.go @@ -0,0 +1,173 @@ +package peerDisconnecting + +import ( + "testing" + + logger "github.com/ElrondNetwork/elrond-go-logger" + p2p "github.com/ElrondNetwork/elrond-go-p2p" + "github.com/ElrondNetwork/elrond-go-p2p/config" + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests" + "github.com/ElrondNetwork/elrond-go-p2p/libp2p" + "github.com/ElrondNetwork/elrond-go-p2p/mock" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var log = logger.GetOrCreate("integrationtests/p2p/peerdisconnecting") + +func TestSeedersDisconnectionWith2AdvertiserAnd3Peers(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + netw := mocknet.New() + p2pCfg := createDefaultConfig() + p2pCfg.KadDhtPeerDiscovery.RefreshIntervalInSec = 1 + + p2pCfg.Sharding = config.ShardingConfig{ + TargetPeerCount: 100, + MaxIntraShardValidators: 40, + MaxCrossShardValidators: 40, + MaxIntraShardObservers: 1, + MaxCrossShardObservers: 1, + MaxSeeders: 3, + Type: p2p.ListsSharder, + AdditionalConnections: config.AdditionalConnectionsConfig{ + MaxFullHistoryObservers: 0, + }, + } + p2pCfg.Node.ThresholdMinConnectedPeers = 3 + + numOfPeers := 3 + seeders, seedersList := createBootstrappedSeeders(p2pCfg, 2, netw) + + integrationTests.WaitForBootstrapAndShowConnected(seeders, integrationTests.P2pBootstrapDelay) + + // Step 2. Create noOfPeers instances of messenger type and call bootstrap + p2pCfg.KadDhtPeerDiscovery.InitialPeerList = seedersList + peers := make([]p2p.Messenger, numOfPeers) + for i := 0; i < numOfPeers; i++ { + arg := libp2p.ArgsNetworkMessenger{ + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pCfg, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + Marshalizer: &mock.MarshallerMock{}, + SyncTimer: &mock.SyncTimerStub{}, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + node, err := libp2p.NewMockMessenger(arg, netw) + require.Nil(t, err) + peers[i] = node + } + + // cleanup function that closes all messengers + defer func() { + for i := 0; i < numOfPeers; i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + for i := 0; i < len(seeders); i++ { + if seeders[i] != nil { + _ = seeders[i].Close() + } + } + }() + + // link all peers so they can connect to each other + _ = netw.LinkAll() + + // Step 3. Call bootstrap on all peers + for _, p := range peers { + _ = p.Bootstrap() + } + integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) + + // Step 4. Disconnect the seeders + log.Info("--- Disconnecting seeders: %v ---\n", seeders) + disconnectSeedersFromPeers(seeders, peers, netw) + + for i := 0; i < 2; i++ { + integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) + } + + // Step 4.1. Test that the peers are disconnected + for _, p := range peers { + assert.Equal(t, numOfPeers-1, len(p.ConnectedPeers())) + } + + for _, s := range seeders { + assert.Equal(t, len(seeders)-1, len(s.ConnectedPeers())) + } + + // Step 5. Re-link and test connections + log.Info("--- Re-linking ---") + _ = netw.LinkAll() + for i := 0; i < 2; i++ { + integrationTests.WaitForBootstrapAndShowConnected(append(seeders, peers...), integrationTests.P2pBootstrapDelay) + } + + // Step 5.1. Test that the peers got reconnected + for _, p := range append(peers, seeders...) { + assert.Equal(t, numOfPeers+len(seeders)-1, len(p.ConnectedPeers())) + } +} + +func createBootstrappedSeeders(baseP2PConfig config.P2PConfig, numSeeders int, netw mocknet.Mocknet) ([]p2p.Messenger, []string) { + seeders := make([]p2p.Messenger, numSeeders) + seedersAddresses := make([]string, numSeeders) + + p2pConfigSeeder := baseP2PConfig + argSeeder := libp2p.ArgsNetworkMessenger{ + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pConfigSeeder, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + Marshalizer: &mock.MarshallerMock{}, + SyncTimer: &mock.SyncTimerStub{}, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + seeders[0], _ = libp2p.NewMockMessenger(argSeeder, netw) + _ = seeders[0].Bootstrap() + seedersAddresses[0] = integrationTests.GetConnectableAddress(seeders[0]) + + for i := 1; i < numSeeders; i++ { + p2pConfigSeeder = baseP2PConfig + p2pConfigSeeder.KadDhtPeerDiscovery.InitialPeerList = []string{integrationTests.GetConnectableAddress(seeders[0])} + argSeeder = libp2p.ArgsNetworkMessenger{ + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pConfigSeeder, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + Marshalizer: &mock.MarshallerMock{}, + SyncTimer: &mock.SyncTimerStub{}, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + seeders[i], _ = libp2p.NewMockMessenger(argSeeder, netw) + _ = netw.LinkAll() + _ = seeders[i].Bootstrap() + seedersAddresses[i] = integrationTests.GetConnectableAddress(seeders[i]) + } + + return seeders, seedersAddresses +} + +func disconnectSeedersFromPeers(seeders []p2p.Messenger, peers []p2p.Messenger, netw mocknet.Mocknet) { + for _, p := range peers { + for _, s := range seeders { + disconnectPeers(p, s, netw) + } + } +} + +func disconnectPeers(peer1 p2p.Messenger, peer2 p2p.Messenger, netw mocknet.Mocknet) { + _ = netw.UnlinkPeers(getPeerId(peer1), getPeerId(peer2)) + _ = netw.DisconnectPeers(getPeerId(peer1), getPeerId(peer2)) + _ = netw.DisconnectPeers(getPeerId(peer2), getPeerId(peer1)) +} diff --git a/integrationTests/peerDiscovery/kadDht/peerDiscovery_test.go b/integrationTests/peerDiscovery/kadDht/peerDiscovery_test.go new file mode 100644 index 00000000..5195071e --- /dev/null +++ b/integrationTests/peerDiscovery/kadDht/peerDiscovery_test.go @@ -0,0 +1,218 @@ +package kadDht + +import ( + "fmt" + "testing" + "time" + + p2p "github.com/ElrondNetwork/elrond-go-p2p" + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests" + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests/peerDiscovery" + "github.com/stretchr/testify/assert" +) + +var durationTopicAnnounceTime = 2 * time.Second + +func TestPeerDiscoveryAndMessageSendingWithOneAdvertiser(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numOfPeers := 20 + + //Step 1. Create advertiser + advertiser := integrationTests.CreateMessengerWithKadDht("") + _ = advertiser.Bootstrap() + + //Step 2. Create numOfPeers instances of messenger type and call bootstrap + peers := make([]p2p.Messenger, numOfPeers) + + for i := 0; i < numOfPeers; i++ { + peers[i] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertiser)) + + _ = peers[i].Bootstrap() + } + + //cleanup function that closes all messengers + defer func() { + for i := 0; i < numOfPeers; i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + if advertiser != nil { + _ = advertiser.Close() + } + }() + + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + + //Step 3. Create a test topic, add receiving handlers + createTestTopicAndWaitForAnnouncements(t, peers) + + //Step 4. run the test for a couple of times as peer discovering and topic announcing + // are not deterministic nor instant processes + + numOfTests := 5 + for i := 0; i < numOfTests; i++ { + testResult := peerDiscovery.RunTest(peers, i, "test topic") + + if testResult { + return + } + } + + assert.Fail(t, "test failed. Discovery/message passing are not validated") +} + +func TestPeerDiscoveryAndMessageSendingWithThreeAdvertisers(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numOfPeers := 20 + numOfAdvertisers := 3 + + //Step 1. Create 3 advertisers and connect them together + advertisers := make([]p2p.Messenger, numOfAdvertisers) + advertisers[0] = integrationTests.CreateMessengerWithKadDht("") + _ = advertisers[0].Bootstrap() + + for idx := 1; idx < numOfAdvertisers; idx++ { + advertisers[idx] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertisers[0])) + _ = advertisers[idx].Bootstrap() + } + + //Step 2. Create numOfPeers instances of messenger type and call bootstrap + peers := make([]p2p.Messenger, numOfPeers) + + for i := 0; i < numOfPeers; i++ { + peers[i] = integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertisers[i%numOfAdvertisers])) + _ = peers[i].Bootstrap() + } + + //cleanup function that closes all messengers + defer func() { + for i := 0; i < numOfPeers; i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + for i := 0; i < numOfAdvertisers; i++ { + if advertisers[i] != nil { + _ = advertisers[i].Close() + } + } + }() + + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + + //Step 3. Create a test topic, add receiving handlers + createTestTopicAndWaitForAnnouncements(t, peers) + + //Step 4. run the test for a couple of times as peer discovering and topic announcing + // are not deterministic nor instant processes + + noOfTests := 5 + for i := 0; i < noOfTests; i++ { + testResult := peerDiscovery.RunTest(peers, i, "test topic") + + if testResult { + return + } + } + + assert.Fail(t, "test failed. Discovery/message passing are not validated") +} + +func TestPeerDiscoveryAndMessageSendingWithOneAdvertiserAndProtocolID(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + advertiser := integrationTests.CreateMessengerWithKadDht("") + _ = advertiser.Bootstrap() + + protocolID1 := "/erd/kad/1.0.0" + protocolID2 := "/amony/kad/0.0.0" + + peer1 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( + integrationTests.GetConnectableAddress(advertiser), + protocolID1, + ) + peer2 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( + integrationTests.GetConnectableAddress(advertiser), + protocolID1, + ) + peer3 := integrationTests.CreateMessengerWithKadDhtAndProtocolID( + integrationTests.GetConnectableAddress(advertiser), + protocolID2, + ) + + peers := []p2p.Messenger{peer1, peer2, peer3} + + for _, peer := range peers { + _ = peer.Bootstrap() + } + + //cleanup function that closes all messengers + defer func() { + for i := 0; i < len(peers); i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + if advertiser != nil { + _ = advertiser.Close() + } + }() + + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + + createTestTopicAndWaitForAnnouncements(t, peers) + + topic := "test topic" + message := []byte("message") + messageProcessors := assignProcessors(peers, topic) + + peer1.Broadcast(topic, message) + time.Sleep(time.Second * 2) + + assert.Equal(t, message, messageProcessors[0].GetLastMessage()) + assert.Equal(t, message, messageProcessors[1].GetLastMessage()) + assert.Nil(t, messageProcessors[2].GetLastMessage()) + + assert.Equal(t, 2, len(peer1.ConnectedPeers())) + assert.Equal(t, 2, len(peer2.ConnectedPeers())) + assert.Equal(t, 1, len(peer3.ConnectedPeers())) +} + +func assignProcessors(peers []p2p.Messenger, topic string) []*peerDiscovery.SimpleMessageProcessor { + processors := make([]*peerDiscovery.SimpleMessageProcessor, 0, len(peers)) + for _, peer := range peers { + proc := &peerDiscovery.SimpleMessageProcessor{} + processors = append(processors, proc) + + err := peer.RegisterMessageProcessor(topic, "test", proc) + if err != nil { + fmt.Println(err.Error()) + } + } + + return processors +} + +func createTestTopicAndWaitForAnnouncements(t *testing.T, peers []p2p.Messenger) { + for _, peer := range peers { + err := peer.CreateTopic("test topic", true) + if err != nil { + assert.Fail(t, "test fail while creating topic") + } + } + + fmt.Printf("Waiting %v for topic announcement...\n", durationTopicAnnounceTime) + time.Sleep(durationTopicAnnounceTime) +} diff --git a/integrationTests/peerDiscovery/messageProcessor.go b/integrationTests/peerDiscovery/messageProcessor.go new file mode 100644 index 00000000..0af1a7b5 --- /dev/null +++ b/integrationTests/peerDiscovery/messageProcessor.go @@ -0,0 +1,51 @@ +package peerDiscovery + +import ( + "bytes" + "sync" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-p2p" +) + +// MessageProcesssor - +type MessageProcesssor struct { + RequiredValue []byte + chanDone chan struct{} + mutDataReceived sync.Mutex + wasDataReceived bool +} + +// NewMessageProcessor - +func NewMessageProcessor(chanDone chan struct{}, requiredVal []byte) *MessageProcesssor { + return &MessageProcesssor{ + RequiredValue: requiredVal, + chanDone: chanDone, + } +} + +// ProcessReceivedMessage - +func (mp *MessageProcesssor) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { + if bytes.Equal(mp.RequiredValue, message.Data()) { + mp.mutDataReceived.Lock() + mp.wasDataReceived = true + mp.mutDataReceived.Unlock() + + mp.chanDone <- struct{}{} + } + + return nil +} + +// WasDataReceived - +func (mp *MessageProcesssor) WasDataReceived() bool { + mp.mutDataReceived.Lock() + defer mp.mutDataReceived.Unlock() + + return mp.wasDataReceived +} + +// IsInterfaceNil returns true if there is no value under the interface +func (mp *MessageProcesssor) IsInterfaceNil() bool { + return mp == nil +} diff --git a/integrationTests/peerDiscovery/simpleMessageProcessor.go b/integrationTests/peerDiscovery/simpleMessageProcessor.go new file mode 100644 index 00000000..02678cc6 --- /dev/null +++ b/integrationTests/peerDiscovery/simpleMessageProcessor.go @@ -0,0 +1,36 @@ +package peerDiscovery + +import ( + "sync" + + "github.com/ElrondNetwork/elrond-go-core/core" + p2p "github.com/ElrondNetwork/elrond-go-p2p" +) + +// SimpleMessageProcessor records the last received message +type SimpleMessageProcessor struct { + mutMessage sync.RWMutex + message []byte +} + +// ProcessReceivedMessage records the message +func (smp *SimpleMessageProcessor) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { + smp.mutMessage.Lock() + smp.message = message.Data() + smp.mutMessage.Unlock() + + return nil +} + +// GetLastMessage returns the last message received +func (smp *SimpleMessageProcessor) GetLastMessage() []byte { + smp.mutMessage.RLock() + defer smp.mutMessage.RUnlock() + + return smp.message +} + +// IsInterfaceNil returns true if there is no value under the interface +func (smp *SimpleMessageProcessor) IsInterfaceNil() bool { + return smp == nil +} diff --git a/integrationTests/peerDiscovery/testRunnner.go b/integrationTests/peerDiscovery/testRunnner.go new file mode 100644 index 00000000..1115acd2 --- /dev/null +++ b/integrationTests/peerDiscovery/testRunnner.go @@ -0,0 +1,78 @@ +package peerDiscovery + +import ( + "fmt" + "strconv" + "sync/atomic" + "time" + + "github.com/ElrondNetwork/elrond-go-p2p" +) + +var durationMsgReceived = 2 * time.Second + +// RunTest will test if all the peers receive a message +func RunTest(peers []p2p.Messenger, testIndex int, topic string) bool { + fmt.Printf("Running test %v\n", testIndex) + + testMessage := "test " + strconv.Itoa(testIndex) + messageProcessors := make([]*MessageProcesssor, len(peers)) + + chanDone := make(chan struct{}) + chanMessageProcessor := make(chan struct{}, len(peers)) + + //add a new message processor for each messenger + for i, peer := range peers { + mp := NewMessageProcessor(chanMessageProcessor, []byte(testMessage)) + + messageProcessors[i] = mp + err := peer.RegisterMessageProcessor(topic, "test", mp) + if err != nil { + fmt.Println(err.Error()) + return false + } + } + + var msgReceived int32 = 0 + + go func() { + + for { + <-chanMessageProcessor + + completelyRecv := true + + atomic.StoreInt32(&msgReceived, 0) + + //to be 100% all peers received the messages, iterate all message processors and check received flag + for _, mp := range messageProcessors { + if !mp.WasDataReceived() { + completelyRecv = false + continue + } + + atomic.AddInt32(&msgReceived, 1) + } + + if !completelyRecv { + continue + } + + //all messengers got the message + chanDone <- struct{}{} + return + } + }() + + //write the message on topic + peers[0].Broadcast(topic, []byte(testMessage)) + + select { + case <-chanDone: + return true + case <-time.After(durationMsgReceived): + fmt.Printf("timeout fetching all messages. Got %d from %d\n", + atomic.LoadInt32(&msgReceived), len(peers)) + return false + } +} diff --git a/integrationTests/pubsub/messageProcessor.go b/integrationTests/pubsub/messageProcessor.go new file mode 100644 index 00000000..70560fc4 --- /dev/null +++ b/integrationTests/pubsub/messageProcessor.go @@ -0,0 +1,56 @@ +package peerDisconnecting + +import ( + "sync" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-p2p" +) + +type messageProcessor struct { + mutMessages sync.Mutex + messages map[core.PeerID][]p2p.MessageP2P +} + +func newMessageProcessor() *messageProcessor { + return &messageProcessor{ + messages: make(map[core.PeerID][]p2p.MessageP2P), + } +} + +// ProcessReceivedMessage - +func (mp *messageProcessor) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { + mp.mutMessages.Lock() + defer mp.mutMessages.Unlock() + + mp.messages[fromConnectedPeer] = append(mp.messages[fromConnectedPeer], message) + + return nil +} + +// Messages - +func (mp *messageProcessor) Messages(pid core.PeerID) []p2p.MessageP2P { + mp.mutMessages.Lock() + defer mp.mutMessages.Unlock() + + return mp.messages[pid] +} + +// AllMessages - +func (mp *messageProcessor) AllMessages() []p2p.MessageP2P { + result := make([]p2p.MessageP2P, 0) + + mp.mutMessages.Lock() + defer mp.mutMessages.Unlock() + + for _, messages := range mp.messages { + result = append(result, messages...) + } + + return result +} + +// IsInterfaceNil - +func (mp *messageProcessor) IsInterfaceNil() bool { + return mp == nil +} diff --git a/integrationTests/pubsub/peerReceivingMessages_test.go b/integrationTests/pubsub/peerReceivingMessages_test.go new file mode 100644 index 00000000..e6f9f8b6 --- /dev/null +++ b/integrationTests/pubsub/peerReceivingMessages_test.go @@ -0,0 +1,187 @@ +package peerDisconnecting + +import ( + "encoding/hex" + "fmt" + "sync" + "testing" + "time" + + "github.com/ElrondNetwork/elrond-go-core/core" + p2p "github.com/ElrondNetwork/elrond-go-p2p" + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests" + "github.com/stretchr/testify/assert" +) + +var durationTest = 30 * time.Second + +type messageProcessorStub struct { + ProcessReceivedMessageCalled func(message p2p.MessageP2P) error +} + +// ProcessReceivedMessage - +func (mps *messageProcessorStub) ProcessReceivedMessage(message p2p.MessageP2P, _ core.PeerID) error { + return mps.ProcessReceivedMessageCalled(message) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (mps *messageProcessorStub) IsInterfaceNil() bool { + return mps == nil +} + +func TestPeerReceivesTheSameMessageMultipleTimesShouldNotHappen(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numOfPeers := 20 + + //Step 1. Create advertiser + advertiser := integrationTests.CreateMessengerWithKadDht("") + + //Step 2. Create numOfPeers instances of messenger type and call bootstrap + peers := make([]p2p.Messenger, numOfPeers) + for i := 0; i < numOfPeers; i++ { + node := integrationTests.CreateMessengerWithKadDht(integrationTests.GetConnectableAddress(advertiser)) + peers[i] = node + } + + //cleanup function that closes all messengers + defer func() { + for i := 0; i < numOfPeers; i++ { + if peers[i] != nil { + _ = peers[i].Close() + } + } + + if advertiser != nil { + _ = advertiser.Close() + } + }() + + chanStop := make(chan struct{}) + + //Step 3. Register pubsub validators + mutMapMessages := sync.Mutex{} + mapMessages := make(map[int]map[string]struct{}) + testTopic := "test" + + for i := 0; i < numOfPeers; i++ { + idx := i + mapMessages[idx] = make(map[string]struct{}) + err := peers[idx].CreateTopic(testTopic, true) + if err != nil { + fmt.Println("CreateTopic failed:", err.Error()) + continue + } + + err = peers[idx].RegisterMessageProcessor(testTopic, "test", &messageProcessorStub{ + ProcessReceivedMessageCalled: func(message p2p.MessageP2P) error { + time.Sleep(time.Second) + + mutMapMessages.Lock() + defer mutMapMessages.Unlock() + + msgId := "peer: " + message.Peer().Pretty() + " - seqNo: 0x" + hex.EncodeToString(message.SeqNo()) + _, ok := mapMessages[idx][msgId] + if ok { + assert.Fail(t, "message %s received twice", msgId) + chanStop <- struct{}{} + } + + mapMessages[idx][msgId] = struct{}{} + return nil + }, + }) + if err != nil { + fmt.Println("RegisterMessageProcessor:", err.Error()) + } + } + + //Step 4. Call bootstrap on all peers + err := advertiser.Bootstrap() + if err != nil { + fmt.Println("Bootstrap failed:", err.Error()) + } + for _, p := range peers { + err = p.Bootstrap() + if err != nil { + fmt.Printf("Bootstrap() for peer id %s failed:%s\n", p.ID(), err.Error()) + } + } + integrationTests.WaitForBootstrapAndShowConnected(peers, integrationTests.P2pBootstrapDelay) + + //Step 5. Continuously send messages from one peer + for timeStart := time.Now(); timeStart.Add(durationTest).Unix() > time.Now().Unix(); { + peers[0].Broadcast(testTopic, []byte("test buff")) + select { + case <-chanStop: + return + default: + } + time.Sleep(time.Millisecond) + } +} + +// TestBroadcastMessageComesFormTheConnectedPeers tests what happens in a network when a message comes through pubsub +// The receiving peer should get the message only from one of the connected peers +func TestBroadcastMessageComesFormTheConnectedPeers(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + topic := "test_topic" + broadcastMessageDuration := time.Second * 2 + peers, err := integrationTests.CreateFixedNetworkOf8Peers() + assert.Nil(t, err) + + defer func() { + integrationTests.ClosePeers(peers) + }() + + //node 0 is connected only to 1 and 3 (check integrationTests.CreateFixedNetworkOf7Peers function) + //a broadcast message from 6 should be received on node 0 only through peers 1 and 3 + + interceptors, err := createTopicsAndMockInterceptors(peers, topic) + assert.Nil(t, err) + + fmt.Println("bootstrapping nodes") + time.Sleep(integrationTests.P2pBootstrapDelay) + + broadcastIdx := 6 + receiverIdx := 0 + shouldReceiveFrom := []int{1, 3} + + broadcastPeer := peers[broadcastIdx] + fmt.Printf("broadcasting message from pid %s\n", broadcastPeer.ID().Pretty()) + broadcastPeer.Broadcast(topic, []byte("dummy")) + time.Sleep(broadcastMessageDuration) + + countReceivedMessages := 0 + receiverInterceptor := interceptors[receiverIdx] + for _, idx := range shouldReceiveFrom { + connectedPid := peers[idx].ID() + countReceivedMessages += len(receiverInterceptor.Messages(connectedPid)) + } + + assert.Equal(t, 1, countReceivedMessages) +} + +func createTopicsAndMockInterceptors(peers []p2p.Messenger, topic string) ([]*messageProcessor, error) { + interceptors := make([]*messageProcessor, len(peers)) + + for idx, p := range peers { + err := p.CreateTopic(topic, true) + if err != nil { + return nil, fmt.Errorf("%w, pid: %s", err, p.ID()) + } + + interceptors[idx] = newMessageProcessor() + err = p.RegisterMessageProcessor(topic, "test", interceptors[idx]) + if err != nil { + return nil, fmt.Errorf("%w, pid: %s", err, p.ID()) + } + } + + return interceptors, nil +} diff --git a/integrationTests/pubsub/unjoin_test.go b/integrationTests/pubsub/unjoin_test.go new file mode 100644 index 00000000..2bfd4d18 --- /dev/null +++ b/integrationTests/pubsub/unjoin_test.go @@ -0,0 +1,71 @@ +package peerDisconnecting + +import ( + "fmt" + "testing" + "time" + + "github.com/ElrondNetwork/elrond-go-p2p/integrationTests" + "github.com/stretchr/testify/assert" +) + +const durationBootstrapping = time.Second * 2 +const durationTraverseNetwork = time.Second * 2 +const durationUnjoin = time.Second * 2 + +func TestPubsubUnjoinShouldWork(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + peers, _ := integrationTests.CreateFixedNetworkOf8Peers() + defer func() { + integrationTests.ClosePeers(peers) + }() + + topic := "test_topic" + processors := make([]*messageProcessor, 0, len(peers)) + for idx, p := range peers { + _ = p.CreateTopic(topic, true) + processors = append(processors, newMessageProcessor()) + _ = p.RegisterMessageProcessor(topic, "test", processors[idx]) + } + + fmt.Println("bootstrapping nodes") + time.Sleep(durationBootstrapping) + + //a message should traverse the network + fmt.Println("sending the message that should traverse the whole network") + sender := peers[4] + sender.Broadcast(topic, []byte("message 1")) + + time.Sleep(durationTraverseNetwork) + + for _, mp := range processors { + assert.Equal(t, 1, len(mp.AllMessages())) + } + + blockedIdxs := []int{3, 6, 2, 5} + //node 3 unjoins the topic, which should prevent the propagation of the messages on peers 3, 6, 2 and 5 + err := peers[3].UnregisterAllMessageProcessors() + assert.Nil(t, err) + + err = peers[3].UnjoinAllTopics() + assert.Nil(t, err) + + time.Sleep(durationUnjoin) + + fmt.Println("sending the message that should traverse half the network") + sender.Broadcast(topic, []byte("message 2")) + + time.Sleep(durationTraverseNetwork) + + for idx, mp := range processors { + if integrationTests.IsIntInSlice(idx, blockedIdxs) { + assert.Equal(t, 1, len(mp.AllMessages())) + continue + } + + assert.Equal(t, 2, len(mp.AllMessages())) + } +} diff --git a/integrationTests/testCommon.go b/integrationTests/testCommon.go new file mode 100644 index 00000000..304a1b4c --- /dev/null +++ b/integrationTests/testCommon.go @@ -0,0 +1,214 @@ +package integrationTests + +import ( + "fmt" + "strings" + "time" + + "github.com/ElrondNetwork/elrond-go-core/marshal" + logger "github.com/ElrondNetwork/elrond-go-logger" + p2p "github.com/ElrondNetwork/elrond-go-p2p" + "github.com/ElrondNetwork/elrond-go-p2p/config" + "github.com/ElrondNetwork/elrond-go-p2p/libp2p" + "github.com/ElrondNetwork/elrond-go-p2p/mock" +) + +var log = logger.GetOrCreate("integrationtests") + +// TestMarshaller GogoProtoMarshalizer used for tests +var TestMarshaller = &marshal.GogoProtoMarshalizer{} + +// P2pBootstrapDelay is used so that nodes have enough time to bootstrap +var P2pBootstrapDelay = 5 * time.Second + +func createP2PConfig(initialPeerList []string) config.P2PConfig { + return config.P2PConfig{ + Node: config.NodeConfig{ + Port: "0", + }, + KadDhtPeerDiscovery: config.KadDhtPeerDiscoveryConfig{ + Enabled: true, + Type: "optimized", + RefreshIntervalInSec: 2, + ProtocolID: "/erd/kad/1.0.0", + InitialPeerList: initialPeerList, + BucketSize: 100, + RoutingTableRefreshIntervalInSec: 100, + }, + Sharding: config.ShardingConfig{ + Type: p2p.NilListSharder, + }, + } +} + +// GetConnectableAddress returns a non circuit, non windows default connectable address for provided messenger +func GetConnectableAddress(mes p2p.Messenger) string { + for _, addr := range mes.Addresses() { + if strings.Contains(addr, "circuit") || strings.Contains(addr, "169.254") { + continue + } + return addr + } + return "" +} + +// WaitForBootstrapAndShowConnected will delay a given duration in order to wait for bootstraping and print the +// number of peers that each node is connected to +func WaitForBootstrapAndShowConnected(peers []p2p.Messenger, durationBootstrapingTime time.Duration) { + log.Info("Waiting for peer discovery...", "time", durationBootstrapingTime) + time.Sleep(durationBootstrapingTime) + + strs := []string{"Connected peers:"} + for _, peer := range peers { + strs = append(strs, fmt.Sprintf("Peer %s is connected to %d peers", peer.ID().Pretty(), len(peer.ConnectedPeers()))) + } + + log.Info(strings.Join(strs, "\n")) +} + +// CreateFixedNetworkOf8Peers assembles a network as following: +// +// 0------------------- 1 +// | | +// 2 ------------------ 3 ------------------ 4 +// | | | +// 5 6 7 +func CreateFixedNetworkOf8Peers() ([]p2p.Messenger, error) { + peers := createMessengersWithNoDiscovery(8) + + connections := map[int][]int{ + 0: {1, 3}, + 1: {4}, + 2: {5, 3}, + 3: {4, 6}, + 4: {7}, + } + + err := createConnections(peers, connections) + if err != nil { + return nil, err + } + + return peers, nil +} + +func createMessengersWithNoDiscovery(numPeers int) []p2p.Messenger { + peers := make([]p2p.Messenger, numPeers) + + for i := 0; i < numPeers; i++ { + peers[i] = CreateMessengerWithNoDiscovery() + } + + return peers +} + +func createConnections(peers []p2p.Messenger, connections map[int][]int) error { + for pid, connectTo := range connections { + err := connectPeerToOthers(peers, pid, connectTo) + if err != nil { + return err + } + } + + return nil +} + +func connectPeerToOthers(peers []p2p.Messenger, idx int, connectToIdxes []int) error { + for _, connectToIdx := range connectToIdxes { + err := peers[idx].ConnectToPeer(peers[connectToIdx].Addresses()[0]) + if err != nil { + return fmt.Errorf("%w connecting %s to %s", err, peers[idx].ID(), peers[connectToIdx].ID()) + } + } + + return nil +} + +// CreateMessengerFromConfig creates a new libp2p messenger with provided configuration +func CreateMessengerFromConfig(p2pConfig config.P2PConfig) p2p.Messenger { + arg := libp2p.ArgsNetworkMessenger{ + Marshalizer: TestMarshaller, + ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp, + P2pConfig: p2pConfig, + SyncTimer: &libp2p.LocalSyncTimer{}, + PreferredPeersHolder: &mock.PeersHolderStub{}, + NodeOperationMode: p2p.NormalOperation, + PeersRatingHandler: &mock.PeersRatingHandlerStub{}, + ConnectionWatcherType: p2p.ConnectionWatcherTypePrint, + } + + if p2pConfig.Sharding.AdditionalConnections.MaxFullHistoryObservers > 0 { + // we deliberately set this, automatically choose full archive node mode + arg.NodeOperationMode = p2p.FullArchiveMode + } + + libP2PMes, err := libp2p.NewNetworkMessenger(arg) + log.LogIfError(err) + + return libP2PMes +} + +// CreateMessengerWithNoDiscovery creates a new libp2p messenger with no peer discovery +func CreateMessengerWithNoDiscovery() p2p.Messenger { + p2pCfg := createP2PConfigWithNoDiscovery() + + return CreateMessengerFromConfig(p2pCfg) +} + +// createP2PConfigWithNoDiscovery creates a new libp2p messenger with no peer discovery +func createP2PConfigWithNoDiscovery() config.P2PConfig { + return config.P2PConfig{ + Node: config.NodeConfig{ + Port: "0", + Seed: "", + }, + KadDhtPeerDiscovery: config.KadDhtPeerDiscoveryConfig{ + Enabled: false, + }, + Sharding: config.ShardingConfig{ + Type: p2p.NilListSharder, + }, + } +} + +// CreateMessengerWithKadDht creates a new libp2p messenger with kad-dht peer discovery +func CreateMessengerWithKadDht(initialAddr string) p2p.Messenger { + initialAddresses := make([]string, 0) + if len(initialAddr) > 0 { + initialAddresses = append(initialAddresses, initialAddr) + } + + p2pCfg := createP2PConfig(initialAddresses) + + return CreateMessengerFromConfig(p2pCfg) +} + +// CreateMessengerWithKadDhtAndProtocolID creates a new libp2p messenger with kad-dht peer discovery and peer ID +func CreateMessengerWithKadDhtAndProtocolID(initialAddr string, protocolID string) p2p.Messenger { + initialAddresses := make([]string, 0) + if len(initialAddr) > 0 { + initialAddresses = append(initialAddresses, initialAddr) + } + p2pCfg := createP2PConfig(initialAddresses) + p2pCfg.KadDhtPeerDiscovery.ProtocolID = protocolID + + return CreateMessengerFromConfig(p2pCfg) +} + +// ClosePeers calls Messenger.Close on the provided peers +func ClosePeers(peers []p2p.Messenger) { + for _, p := range peers { + _ = p.Close() + } +} + +// IsIntInSlice returns true if idx is found on any position in the provided slice +func IsIntInSlice(idx int, slice []int) bool { + for _, value := range slice { + if value == idx { + return true + } + } + + return false +}