Skip to content

Commit

Permalink
Merge pull request #1001 from libp2p/feat/rw-stream
Browse files Browse the repository at this point in the history
feat: update to go-libp2p-core 0.7.0 interface changes
  • Loading branch information
Stebalien authored Nov 11, 2020
2 parents e046c95 + 7a98f28 commit fcf6964
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 53 deletions.
17 changes: 8 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ require (
github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.0
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p-autonat v0.3.2
github.com/libp2p/go-libp2p-autonat v0.4.0
github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-circuit v0.3.1
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-circuit v0.4.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.4
github.com/libp2p/go-libp2p-mplex v0.3.0
github.com/libp2p/go-libp2p-nat v0.0.6
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-swarm v0.2.8
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-libp2p-swarm v0.3.1
github.com/libp2p/go-libp2p-testing v0.3.0
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-libp2p-transport-upgrader v0.3.0
github.com/libp2p/go-libp2p-yamux v0.2.8
github.com/libp2p/go-libp2p-yamux v0.4.0
github.com/libp2p/go-msgio v0.0.6
github.com/libp2p/go-netroute v0.1.3
github.com/libp2p/go-stream-muxer-multistream v0.3.0
Expand All @@ -38,8 +38,7 @@ require (
github.com/miekg/dns v1.1.31 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.2.0
github.com/multiformats/go-multistream v0.1.2
github.com/multiformats/go-multistream v0.2.0
github.com/onsi/ginkgo v1.12.1 // indirect
github.com/stretchr/testify v1.6.1
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9
Expand Down
50 changes: 40 additions & 10 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion p2p/discovery/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/whyrusleeping/mdns"
)

Expand Down
20 changes: 18 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
msmux "github.com/multiformats/go-multistream"
)

Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (h *BasicHost) Close() error {

type streamWrapper struct {
network.Stream
rw io.ReadWriter
rw io.ReadWriteCloser
}

func (s *streamWrapper) Read(b []byte) (int, error) {
Expand All @@ -1011,3 +1011,19 @@ func (s *streamWrapper) Read(b []byte) (int, error) {
func (s *streamWrapper) Write(b []byte) (int, error) {
return s.rw.Write(b)
}

func (s *streamWrapper) Close() error {
return s.rw.Close()
}

func (s *streamWrapper) CloseWrite() error {
// Flush the handshake before closing, but ignore the error. The other
// end may have closed their side for reading.
//
// If something is wrong with the stream, the user will get on error on
// read instead.
if flusher, ok := s.rw.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
return s.Stream.CloseWrite()
}
2 changes: 1 addition & 1 deletion p2p/host/relay/addrsplosion.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

circuit "github.com/libp2p/go-libp2p-circuit"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)

// This function cleans up a relay's address set to remove private addresses and curtail
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
basic "github.com/libp2p/go-libp2p/p2p/host/basic"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)

// test specific parameters
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)

var connCounter int64
Expand Down
4 changes: 3 additions & 1 deletion p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host
return nil, err
}

mn.proc.AddChild(n.proc)
// Ensure we close the hoset when we close the mock network.
// Otherwise, tests leak memory.
mn.proc.AddChild(goprocess.WithTeardown(h.Close))

mn.Lock()
mn.nets[n.peer] = n
Expand Down
7 changes: 3 additions & 4 deletions p2p/net/mock/mock_notif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -85,13 +84,13 @@ func TestNotifications(t *testing.T) {

for _, s := range nets {
s.SetStreamHandler(func(s network.Stream) {
helpers.FullClose(s)
s.Close()
})
}

for _, s := range nets {
s.SetStreamHandler(func(s network.Stream) {
helpers.FullClose(s)
s.Close()
})
}

Expand All @@ -105,7 +104,7 @@ func TestNotifications(t *testing.T) {
continue
}
t.Logf("%s %s <--%p--> %s %s", c.LocalPeer(), c.LocalMultiaddr(), st1, c.RemotePeer(), c.RemoteMultiaddr())
helpers.FullClose(st1)
st1.Close()
}
}

Expand Down
20 changes: 14 additions & 6 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *stream) SetProtocol(proto protocol.ID) {
s.protocol.Store(proto)
}

func (s *stream) Close() error {
func (s *stream) CloseWrite() error {
select {
case s.close <- struct{}{}:
default:
Expand All @@ -121,6 +121,15 @@ func (s *stream) Close() error {
return nil
}

func (s *stream) CloseRead() error {
return s.read.CloseWithError(ErrClosed)
}

func (s *stream) Close() error {
_ = s.CloseRead()
return s.CloseWrite()
}

func (s *stream) Reset() error {
// Cancel any pending reads/writes with an error.
s.write.CloseWithError(mux.ErrReset)
Expand Down Expand Up @@ -258,7 +267,7 @@ func (s *stream) transport() {
return
case <-s.close:
if err := drainBuf(); err != nil {
s.resetWith(err)
s.cancelWrite(err)
return
}
s.writeErr = s.write.Close()
Expand All @@ -268,20 +277,19 @@ func (s *stream) transport() {
return
case o := <-s.toDeliver:
if err := deliverOrWait(o); err != nil {
s.resetWith(err)
s.cancelWrite(err)
return
}
case <-timer.C: // ok, due to write it out.
if err := drainBuf(); err != nil {
s.resetWith(err)
s.cancelWrite(err)
return
}
}
}
}

func (s *stream) resetWith(err error) {
func (s *stream) cancelWrite(err error) {
s.write.CloseWithError(err)
s.read.CloseWithError(err)
s.writeErr = err
}
5 changes: 2 additions & 3 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

detectrace "github.com/ipfs/go-detect-race"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -322,7 +321,7 @@ func TestStreams(t *testing.T) {
func performPing(t *testing.T, st string, n int, s network.Stream) error {
t.Helper()

defer helpers.FullClose(s)
defer s.Close()

for i := 0; i < n; i++ {
b := make([]byte, 4+len(st))
Expand All @@ -344,7 +343,7 @@ func makePonger(t *testing.T, st string, errs chan<- error) func(network.Stream)

return func(s network.Stream) {
go func() {
defer helpers.FullClose(s)
defer s.Close()

for {
b := make([]byte, 4+len(st))
Expand Down
7 changes: 3 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -23,7 +22,7 @@ import (
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
msmux "github.com/multiformats/go-multistream"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -377,7 +376,7 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) {
var ph *peerHandler

defer func() {
helpers.FullClose(s)
_ = s.Close()
if ph != nil {
ph.snapshotMu.RUnlock()
}
Expand Down Expand Up @@ -421,7 +420,7 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) {
return
}

defer func() { go helpers.FullClose(s) }()
defer s.Close()

log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())

Expand Down
6 changes: 3 additions & 3 deletions p2p/protocol/identify/id_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package identify

import (
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand All @@ -22,11 +21,11 @@ func (ids *IDService) deltaHandler(s network.Stream) {
mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil {
log.Warning("error reading identify message: ", err)
s.Reset()
_ = s.Reset()
return
}

defer helpers.FullClose(s)
defer s.Close()

log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())

Expand All @@ -37,6 +36,7 @@ func (ids *IDService) deltaHandler(s network.Stream) {

p := s.Conn().RemotePeer()
if err := ids.consumeDelta(p, delta); err != nil {
_ = s.Reset()
log.Warningf("delta update from peer %s failed: %s", p, err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -518,7 +517,7 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
<-block
w := protoio.NewDelimitedWriter(s)
w.WriteMsg(&pb.Identify{Protocols: h1.Mux().Protocols()})
helpers.FullClose(s)
s.Close()
}
h1.RemoveStreamHandler(identify.ID)
h1.SetStreamHandler(identify.ID, handler)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)

// ActivationThresh sets how many times an address must be seen as "activated"
Expand Down
7 changes: 4 additions & 3 deletions p2p/protocol/identify/peer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -129,10 +128,11 @@ func (ph *peerHandler) sendDelta(ctx context.Context) error {
return fmt.Errorf("failed to open delta stream: %w", err)
}

defer helpers.FullClose(ds)
defer ds.Close()

c := ds.Conn()
if err := protoio.NewDelimitedWriter(ds).WriteMsg(&pb.Identify{Delta: mes}); err != nil {
_ = ds.Reset()
return fmt.Errorf("failed to send delta message, %w", err)
}
log.Debugw("sent identify update", "protocol", ds.Protocol(), "peer", c.RemotePeer(),
Expand All @@ -150,13 +150,14 @@ func (ph *peerHandler) sendPush(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to open push stream: %w", err)
}
defer helpers.FullClose(dp)
defer dp.Close()

snapshot := ph.ids.getSnapshot()
ph.snapshotMu.Lock()
ph.snapshot = snapshot
ph.snapshotMu.Unlock()
if err := ph.ids.writeChunkedIdentifyMsg(dp.Conn(), snapshot, dp); err != nil {
_ = dp.Reset()
return fmt.Errorf("failed to send push message: %w", err)
}

Expand Down

0 comments on commit fcf6964

Please sign in to comment.