Skip to content

Commit

Permalink
Timeout all Identify stream reads (#1032)
Browse files Browse the repository at this point in the history
* fix: read timeouts on Identify protocols

* fixed tests

* review and go fmt
  • Loading branch information
aarshkshah1992 authored Jan 14, 2021
1 parent 577e752 commit 1c850e1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 3 deletions.
14 changes: 11 additions & 3 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const LibP2PVersion = "ipfs/0.1.0"
// Deprecated: Set this with the UserAgent option.
var ClientVersion = "github.com/libp2p/go-libp2p"

// StreamReadTimeout is the read timeout on all incoming Identify family streams.
var StreamReadTimeout = 60 * time.Second

var (
legacyIDSize = 2 * 1024 // 2k Bytes
signedIDSize = 8 * 1024 // 8K
Expand Down Expand Up @@ -369,7 +372,8 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
s.Reset()
return
}
ids.handleIdentifyResponse(s)

err = ids.handleIdentifyResponse(s)
}

func (ids *IDService) sendIdentifyResp(s network.Stream) {
Expand Down Expand Up @@ -408,7 +412,9 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) {
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
}

func (ids *IDService) handleIdentifyResponse(s network.Stream) {
func (ids *IDService) handleIdentifyResponse(s network.Stream) error {
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))

c := s.Conn()

r := protoio.NewDelimitedReader(s, signedIDSize)
Expand All @@ -417,14 +423,16 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) {
if err := readAllIDMessages(r, mes); err != nil {
log.Warning("error reading identify message: ", err)
s.Reset()
return
return err
}

defer s.Close()

log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())

ids.consumeMessage(mes, c)

return nil
}

func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error {
Expand Down
3 changes: 3 additions & 0 deletions p2p/protocol/identify/id_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"time"

pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"

Expand All @@ -15,6 +16,8 @@ const IDDelta = "/p2p/id/delta/1.0.0"

// deltaHandler handles incoming delta updates from peers.
func (ids *IDService) deltaHandler(s network.Stream) {
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))

c := s.Conn()

r := protoio.NewDelimitedReader(s, 2048)
Expand Down
80 changes: 80 additions & 0 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,3 +978,83 @@ func TestLargePushMessage(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond)
testHasCertifiedAddrs(t, h2, h1p, h1.Addrs())
}

func TestIdentifyResponseReadTimeout(t *testing.T) {
ctx := context.Background()
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
defer h1.Close()
defer h2.Close()

h2p := h2.ID()
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()
// remote stream handler will just hang and not send back an identify response
h2.SetStreamHandler(identify.ID, func(s network.Stream) {
time.Sleep(100 * time.Second)
})

sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationFailed), eventbus.BufSize(16))
require.NoError(t, err)

h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(ctx, h2pi))

select {
case ev := <-sub.Out():
fev := ev.(event.EvtPeerIdentificationFailed)
require.EqualError(t, fev.Reason, "i/o deadline reached")
case <-time.After(5 * time.Second):
t.Fatal("did not receive identify failure event")
}
}

func TestIncomingIDStreamsTimeout(t *testing.T) {
ctx := context.Background()
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

protocols := []protocol.ID{identify.IDPush, identify.IDDelta}

for _, p := range protocols {
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
defer h1.Close()
defer h2.Close()

ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()

h2p := h2.ID()
h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(ctx, h2pi))

_, err := h1.NewStream(ctx, h2p, p)
require.NoError(t, err)

// remote peer should eventually reset stream
require.Eventually(t, func() bool {
c := h2.Network().ConnsToPeer(h1.ID())[0]
return len(c.GetStreams()) == 0
}, 1*time.Second, 200*time.Millisecond)
}
}

0 comments on commit 1c850e1

Please sign in to comment.