From e6379f54a89ee5c6acdeca29d4221b6917adf6aa Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 2 Jun 2022 13:30:51 -0700 Subject: [PATCH] identify: Fix flaky tests (#1555) * Fix flaky timing dependent tests * Update go-libp2p-peerstore dependency * Register notifiee synchronously * Only a single connection * Remove WaitForDisconnectNotification hack since notifs are now synchronous * Add debug logging to identify tests * Close chan once --- go.mod | 2 +- go.sum | 3 +- p2p/net/swarm/testing/testing.go | 21 +++++++- p2p/protocol/identify/id.go | 1 + p2p/protocol/identify/id_test.go | 84 ++++++++++++++++++++++---------- 5 files changed, 81 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 3a36894e32..de7dc8c856 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 github.com/libp2p/go-libp2p-circuit v0.6.0 github.com/libp2p/go-libp2p-core v0.16.1 - github.com/libp2p/go-libp2p-peerstore v0.6.0 + github.com/libp2p/go-libp2p-peerstore v0.7.0 github.com/libp2p/go-libp2p-resource-manager v0.3.0 github.com/libp2p/go-libp2p-testing v0.9.2 github.com/libp2p/go-mplex v0.7.0 diff --git a/go.sum b/go.sum index 8b0090ceee..c398d27b91 100644 --- a/go.sum +++ b/go.sum @@ -414,8 +414,9 @@ github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsb github.com/libp2p/go-libp2p-core v0.16.1 h1:bWoiEBqVkpJ13hbv/f69tHODp86t6mvc4fBN4DkK73M= github.com/libp2p/go-libp2p-core v0.16.1/go.mod h1:O3i/7y+LqUb0N+qhzXjBjjpchgptWAVMG1Voegk7b4c= github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M= -github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= +github.com/libp2p/go-libp2p-peerstore v0.7.0 h1:2iIUwok3vtmnWJTZeTeLgnBO6GbkXcwSRwgZHEKrQZs= +github.com/libp2p/go-libp2p-peerstore v0.7.0/go.mod h1:cdUWTHro83vpg6unCpGUr8qJoX3e93Vy8o97u5ppIM0= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-quic-transport v0.16.0 h1:aVg9/jr+R2esov5sH7wkXrmYmqJiUjtLMLYX3L9KYdY= diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 8640f1be9c..bb470f11ae 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -37,11 +37,29 @@ type config struct { connectionGater connmgr.ConnectionGater rcmgr network.ResourceManager sk crypto.PrivKey + clock +} + +type clock interface { + Now() time.Time +} + +type realclock struct{} + +func (rc realclock) Now() time.Time { + return time.Now() } // Option is an option that can be passed when constructing a test swarm. type Option func(*testing.T, *config) +// WithClock sets the clock to use for this swarm +func WithClock(clock clock) Option { + return func(_ *testing.T, c *config) { + c.clock = clock + } +} + // OptDisableReuseport disables reuseport in this test swarm. var OptDisableReuseport Option = func(_ *testing.T, c *config) { c.disableReuseport = true @@ -105,6 +123,7 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, opts ...tptu.Option) transport.Up // GenSwarm generates a new test swarm. func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { var cfg config + cfg.clock = realclock{} for _, o := range opts { o(t, &cfg) } @@ -124,7 +143,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { p.Addr = tnet.ZeroLocalTCPAddress } - ps, err := pstoremem.NewPeerstore() + ps, err := pstoremem.NewPeerstore(pstoremem.WithClock(cfg.clock)) require.NoError(t, err) ps.AddPubKey(p.ID, p.PubKey) ps.AddPrivKey(p.ID, p.PrivKey) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 072cc81aa2..e4f07f5bff 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -327,6 +327,7 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { go func() { defer close(wait) if err := ids.identifyConn(c); err != nil { + log.Warnf("failed to identify %s: %s", c.RemotePeer(), err) ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err}) return } diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index cc809e1e90..1290f5250f 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p" blhost "github.com/libp2p/go-libp2p/p2p/host/blank" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/libp2p/go-libp2p/p2p/net/swarm" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/libp2p/go-libp2p/p2p/protocol/identify" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" @@ -28,17 +29,22 @@ import ( "github.com/libp2p/go-eventbus" "github.com/libp2p/go-libp2p-peerstore/pstoremem" - "github.com/libp2p/go-libp2p-testing/race" + mockClock "github.com/benbjohnson/clock" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func init() { + logging.SetLogLevel("net/identify", "debug") +} + func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) { t.Helper() - assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p)) + require.True(t, assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p))) } func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) { @@ -62,7 +68,7 @@ func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.M if !ok { t.Error("unexpected record type") } - assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p)) + require.True(t, assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p))) } func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) { @@ -147,15 +153,15 @@ func emitAddrChangeEvt(t *testing.T, h host.Host) { // id service is done. func TestIDService(t *testing.T) { // This test is highly timing dependent, waiting on timeouts/expiration. - if race.WithRace() { - t.Skip("skipping highly timing dependent test when race detector is enabled") - } oldTTL := peerstore.RecentlyConnectedAddrTTL peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL }) - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := blhost.NewBlankHost(swarmt.GenSwarm(t)) + clk := mockClock.NewMock() + swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk)) + swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk)) + h1 := blhost.NewBlankHost(swarm1) + h2 := blhost.NewBlankHost(swarm2) h1p := h1.ID() h2p := h2.ID() @@ -212,6 +218,8 @@ func TestIDService(t *testing.T) { testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key // Need both sides to actually notice that the connection has been closed. + sentDisconnect1 := waitForDisconnectNotification(swarm1) + sentDisconnect2 := waitForDisconnectNotification(swarm2) h1.Network().ClosePeer(h2p) h2.Network().ClosePeer(h1p) if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 { @@ -225,10 +233,13 @@ func TestIDService(t *testing.T) { testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p)) + <-sentDisconnect1 + <-sentDisconnect2 + // the addrs had their TTLs reduced on disconnect, and // will be forgotten soon after t.Log("testing addrs after TTL expiration") - time.Sleep(time.Second) + clk.Add(time.Second) testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{}) @@ -794,13 +805,11 @@ func TestLargeIdentifyMessage(t *testing.T) { peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL }) - sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096) - require.NoError(t, err) - sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096) - require.NoError(t, err) - - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1))) - h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2))) + clk := mockClock.NewMock() + swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk)) + swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk)) + h1 := blhost.NewBlankHost(swarm1) + h2 := blhost.NewBlankHost(swarm2) // add protocol strings to make the message larger // about 2K of protocol strings @@ -833,10 +842,12 @@ func TestLargeIdentifyMessage(t *testing.T) { forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234") h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL) - require.NoError(t, h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2p))) + h2pi := h2.Peerstore().PeerInfo(h2p) + h2pi.Addrs = h2pi.Addrs[:1] + require.NoError(t, h1.Connect(context.Background(), h2pi)) h1t2c := h1.Network().ConnsToPeer(h2p) - require.NotEmpty(t, h1t2c, "should have a conn here") + require.Equal(t, 1, len(h1t2c), "should have a conn here") ids1.IdentifyConn(h1t2c[0]) @@ -851,7 +862,7 @@ func TestLargeIdentifyMessage(t *testing.T) { // now, this wait we do have to do. it's the wait for the Listening side // to be done identifying the connection. c := h2.Network().ConnsToPeer(h1.ID()) - if len(c) < 1 { + if len(c) != 1 { t.Fatal("should have connection by now at least.") } ids2.IdentifyConn(c[0]) @@ -864,6 +875,8 @@ func TestLargeIdentifyMessage(t *testing.T) { testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key // Need both sides to actually notice that the connection has been closed. + sentDisconnect1 := waitForDisconnectNotification(swarm1) + sentDisconnect2 := waitForDisconnectNotification(swarm2) h1.Network().ClosePeer(h2p) h2.Network().ClosePeer(h1p) if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 { @@ -877,10 +890,13 @@ func TestLargeIdentifyMessage(t *testing.T) { testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p)) + <-sentDisconnect1 + <-sentDisconnect2 + // the addrs had their TTLs reduced on disconnect, and // will be forgotten soon after t.Log("testing addrs after TTL expiration") - time.Sleep(time.Second) + clk.Add(time.Second) testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{}) @@ -898,13 +914,8 @@ func TestLargePushMessage(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096) - require.NoError(t, err) - sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096) - require.NoError(t, err) - - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1))) - h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2))) + h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) + h2 := blhost.NewBlankHost(swarmt.GenSwarm(t)) // add protocol strings to make the message larger // about 2K of protocol strings @@ -1109,3 +1120,22 @@ func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multia } } } + +func waitForDisconnectNotification(swarm *swarm.Swarm) <-chan struct{} { + done := make(chan struct{}) + var once sync.Once + var nb *network.NotifyBundle + nb = &network.NotifyBundle{ + DisconnectedF: func(n network.Network, c network.Conn) { + once.Do(func() { + go func() { + swarm.StopNotify(nb) + close(done) + }() + }) + }, + } + swarm.Notify(nb) + + return done +}