From a92b1f3225ea30f7fad7fd3203fe43a28144ac71 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 17 Dec 2020 10:22:39 +0530 Subject: [PATCH 1/3] fix: read timeouts on Identify protocols --- p2p/protocol/identify/id.go | 16 +++++-- p2p/protocol/identify/id_delta.go | 3 ++ p2p/protocol/identify/id_push.go | 5 +- p2p/protocol/identify/id_test.go | 80 +++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 4 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 1b9cf93e9e..6ea58f4a41 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -46,6 +46,9 @@ const LibP2PVersion = "ipfs/0.1.0" // Deprecated: Set this with the UserAgent option. var ClientVersion = "github.com/libp2p/go-libp2p" +// StreamReadTimeout is the read timeout on all incoming Identify family streams. +var StreamReadTimeout = 60 * time.Second + var ( legacyIDSize = 2 * 1024 // 2k Bytes signedIDSize = 8 * 1024 // 8K @@ -369,10 +372,13 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { s.Reset() return } - ids.handleIdentifyResponse(s) + + err = ids.handleIdentifyResponse(s) } func (ids *IDService) sendIdentifyResp(s network.Stream) { + s.SetDeadline(time.Now().Add(StreamReadTimeout)) + var ph *peerHandler defer func() { @@ -408,7 +414,9 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) { log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) } -func (ids *IDService) handleIdentifyResponse(s network.Stream) { +func (ids *IDService) handleIdentifyResponse(s network.Stream) error { + s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) + c := s.Conn() r := protoio.NewDelimitedReader(s, signedIDSize) @@ -417,7 +425,7 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) { if err := readAllIDMessages(r, mes); err != nil { log.Warning("error reading identify message: ", err) s.Reset() - return + return err } defer s.Close() @@ -425,6 +433,8 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) { log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) ids.consumeMessage(mes, c) + + return nil } func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error { diff --git a/p2p/protocol/identify/id_delta.go b/p2p/protocol/identify/id_delta.go index 089f722cca..304d4be56a 100644 --- a/p2p/protocol/identify/id_delta.go +++ b/p2p/protocol/identify/id_delta.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "time" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" @@ -15,6 +16,8 @@ const IDDelta = "/p2p/id/delta/1.0.0" // deltaHandler handles incoming delta updates from peers. func (ids *IDService) deltaHandler(s network.Stream) { + s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) + c := s.Conn() r := protoio.NewDelimitedReader(s, 2048) diff --git a/p2p/protocol/identify/id_push.go b/p2p/protocol/identify/id_push.go index c2977e4bfe..7de7e4df58 100644 --- a/p2p/protocol/identify/id_push.go +++ b/p2p/protocol/identify/id_push.go @@ -2,6 +2,7 @@ package identify import ( "github.com/libp2p/go-libp2p-core/network" + "time" ) // IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing @@ -13,5 +14,7 @@ const IDPush = "/ipfs/id/push/1.0.0" // pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol. func (ids *IDService) pushHandler(s network.Stream) { + s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) + ids.handleIdentifyResponse(s) -} +} \ No newline at end of file diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 2c838bc217..f2ef6d893f 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -978,3 +978,83 @@ func TestLargePushMessage(t *testing.T) { }, 5*time.Second, 500*time.Millisecond) testHasCertifiedAddrs(t, h2, h1p, h1.Addrs()) } + +func TestIdentifyResponseReadTimeout(t *testing.T) { + ctx := context.Background() + timeout := identify.StreamReadTimeout + identify.StreamReadTimeout = 100 * time.Millisecond + defer func() { + identify.StreamReadTimeout = timeout + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + defer h1.Close() + defer h2.Close() + + h2p := h2.ID() + ids1 := identify.NewIDService(h1) + ids2 := identify.NewIDService(h2) + defer ids1.Close() + defer ids2.Close() + // remote stream handler will just hang and not send back an identify response + h2.SetStreamHandler(identify.ID, func(s network.Stream) { + time.Sleep(100 * time.Second) + }) + + sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationFailed), eventbus.BufSize(16)) + require.NoError(t, err) + + h2pi := h2.Peerstore().PeerInfo(h2p) + require.NoError(t, h1.Connect(ctx, h2pi)) + + select { + case ev := <- sub.Out(): + fev := ev.(event.EvtPeerIdentificationFailed) + require.EqualError(t, fev.Reason, "i/o deadline reached") + case <- time.After(5 * time.Second): + t.Fatal("did not recieve identify failure event") + } +} + +func TestIncomingIDStreamsTimeout(t *testing.T) { + ctx := context.Background() + timeout := identify.StreamReadTimeout + identify.StreamReadTimeout = 100 * time.Millisecond + defer func() { + identify.StreamReadTimeout = timeout + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + protocols := []protocol.ID{identify.IDPush, identify.IDDelta} + + for _, p := range protocols { + h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + defer h1.Close() + defer h2.Close() + + ids1 := identify.NewIDService(h1) + ids2 := identify.NewIDService(h2) + defer ids1.Close() + defer ids2.Close() + + h2p := h2.ID() + h2pi := h2.Peerstore().PeerInfo(h2p) + require.NoError(t, h1.Connect(ctx, h2pi)) + + s, err := h1.NewStream(ctx, h2p, p) + require.NoError(t, err) + // wait for stream read on remote peer to timeout + time.Sleep(500 * time.Millisecond) + + // stream must already be reset by other side as we were too late + _, err = s.Write([]byte("test")) + require.EqualError(t, err, "stream reset") + } +} \ No newline at end of file From 737098534cb2495286a521307f0b821671769e08 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 17 Dec 2020 13:14:00 +0530 Subject: [PATCH 2/3] fixed tests --- p2p/protocol/identify/id_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index f2ef6d893f..fd983fde98 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -1016,7 +1016,7 @@ func TestIdentifyResponseReadTimeout(t *testing.T) { fev := ev.(event.EvtPeerIdentificationFailed) require.EqualError(t, fev.Reason, "i/o deadline reached") case <- time.After(5 * time.Second): - t.Fatal("did not recieve identify failure event") + t.Fatal("did not receive identify failure event") } } @@ -1048,13 +1048,13 @@ func TestIncomingIDStreamsTimeout(t *testing.T) { h2pi := h2.Peerstore().PeerInfo(h2p) require.NoError(t, h1.Connect(ctx, h2pi)) - s, err := h1.NewStream(ctx, h2p, p) + _, err := h1.NewStream(ctx, h2p, p) require.NoError(t, err) - // wait for stream read on remote peer to timeout - time.Sleep(500 * time.Millisecond) - // stream must already be reset by other side as we were too late - _, err = s.Write([]byte("test")) - require.EqualError(t, err, "stream reset") + // remote peer should eventually reset stream + require.Eventually(t, func() bool { + c := h2.Network().ConnsToPeer(h1.ID())[0] + return len(c.GetStreams()) == 0 + }, 1 * time.Second, 200 * time.Millisecond) } } \ No newline at end of file From b2997a71ed73150cbcb933d3baf7307bf8319b60 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 18 Dec 2020 10:56:36 +0530 Subject: [PATCH 3/3] review and go fmt --- p2p/protocol/identify/id.go | 4 +--- p2p/protocol/identify/id_delta.go | 2 +- p2p/protocol/identify/id_push.go | 5 +---- p2p/protocol/identify/id_test.go | 8 ++++---- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 6ea58f4a41..3557e8aa0a 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -377,8 +377,6 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { } func (ids *IDService) sendIdentifyResp(s network.Stream) { - s.SetDeadline(time.Now().Add(StreamReadTimeout)) - var ph *peerHandler defer func() { @@ -415,7 +413,7 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) { } func (ids *IDService) handleIdentifyResponse(s network.Stream) error { - s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) + _ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) c := s.Conn() diff --git a/p2p/protocol/identify/id_delta.go b/p2p/protocol/identify/id_delta.go index 304d4be56a..48bae8bf22 100644 --- a/p2p/protocol/identify/id_delta.go +++ b/p2p/protocol/identify/id_delta.go @@ -16,7 +16,7 @@ const IDDelta = "/p2p/id/delta/1.0.0" // deltaHandler handles incoming delta updates from peers. func (ids *IDService) deltaHandler(s network.Stream) { - s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) + _ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) c := s.Conn() diff --git a/p2p/protocol/identify/id_push.go b/p2p/protocol/identify/id_push.go index 7de7e4df58..c2977e4bfe 100644 --- a/p2p/protocol/identify/id_push.go +++ b/p2p/protocol/identify/id_push.go @@ -2,7 +2,6 @@ package identify import ( "github.com/libp2p/go-libp2p-core/network" - "time" ) // IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing @@ -14,7 +13,5 @@ const IDPush = "/ipfs/id/push/1.0.0" // pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol. func (ids *IDService) pushHandler(s network.Stream) { - s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) - ids.handleIdentifyResponse(s) -} \ No newline at end of file +} diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index fd983fde98..326ccea799 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -1012,10 +1012,10 @@ func TestIdentifyResponseReadTimeout(t *testing.T) { require.NoError(t, h1.Connect(ctx, h2pi)) select { - case ev := <- sub.Out(): + case ev := <-sub.Out(): fev := ev.(event.EvtPeerIdentificationFailed) require.EqualError(t, fev.Reason, "i/o deadline reached") - case <- time.After(5 * time.Second): + case <-time.After(5 * time.Second): t.Fatal("did not receive identify failure event") } } @@ -1055,6 +1055,6 @@ func TestIncomingIDStreamsTimeout(t *testing.T) { require.Eventually(t, func() bool { c := h2.Network().ConnsToPeer(h1.ID())[0] return len(c.GetStreams()) == 0 - }, 1 * time.Second, 200 * time.Millisecond) + }, 1*time.Second, 200*time.Millisecond) } -} \ No newline at end of file +}