diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 787de96d248b..202dc6f5dd38 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/ipfs/go-ipfs", - "GoVersion": "go1.5", + "GoVersion": "go1.5.1", "Packages": [ "./..." ], diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index a9a1a7aaf939..1a6a47624b2a 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -4,12 +4,10 @@ import ( "fmt" "math/rand" "net" - "strings" "syscall" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" @@ -95,84 +93,28 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( return connOut, nil } -// rawConnDial dials the underlying net.Conn + manet.Conns -func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { - - // before doing anything, check we're going to be able to dial. - // we may not support the given address. - if _, _, err := manet.DialArgs(raddr); err != nil { - return nil, err - } - - if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { - log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)) - return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr) - } +func (d *Dialer) AddDialer(pd ProtoDialer) { + d.Dialers = append(d.Dialers, pd) +} - // get local addr to use. - laddr := pickLocalAddr(d.LocalAddrs, raddr) - logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr) - defer log.EventBegin(ctx, "connDialRawConn", logdial).Done() - - // make a copy of the manet.Dialer, we may need to change its timeout. - madialer := d.Dialer - - if laddr != nil && reuseportIsAvailable() { - // we're perhaps going to dial twice. half the timeout, so we can afford to. - // otherwise our context would expire right after the first dial. - madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2) - - // dial using reuseport.Dialer, because we're probably reusing addrs. - // this is optimistic, as the reuseDial may fail to bind the port. - rpev := log.EventBegin(ctx, "connDialReusePort", logdial) - if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { - // if it worked, wrap the raw net.Conn with our manet.Conn - logdial["reuseport"] = "success" - rpev.Done() - return manet.WrapNetConn(nconn) - } else if !retry { - // reuseDial is sure this is a legitimate dial failure, not a reuseport failure. - logdial["reuseport"] = "failure" - logdial["error"] = reuseErr - rpev.Done() - return nil, reuseErr - } else { - // this is a failure to reuse port. log it. - logdial["reuseport"] = "retry" - logdial["error"] = reuseErr - rpev.Done() +// returns dialer that can dial the given address +func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) ProtoDialer { + for _, pd := range d.Dialers { + if pd.Matches(raddr) { + return pd } } - - defer log.EventBegin(ctx, "connDialManet", logdial).Done() - return madialer.Dial(raddr) + return nil } -func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) { - if laddr == nil { - // if we're given no local address no sense in using reuseport to dial, dial out as usual. - return nil, true, reuseport.ErrReuseFailed - } - - // give reuse.Dialer the manet.Dialer's Dialer. - // (wow, Dialer should've so been an interface...) - rd := reuseport.Dialer{dialer} - - // get the local net.Addr manually - rd.D.LocalAddr, err = manet.ToNetAddr(laddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. - } - - // get the raddr dial args for rd.dial - network, netraddr, err := manet.DialArgs(raddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. +// rawConnDial dials the underlying net.Conn + manet.Conns +func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { + sd := d.subDialerForAddr(raddr) + if sd == nil { + return nil, fmt.Errorf("no dialer for %s", raddr) } - // rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set. - conn, err = rd.Dial(network, netraddr) - return conn, reuseErrShouldRetry(err), err // hey! it worked! + return sd.Dial(raddr) } // reuseErrShouldRetry diagnoses whether to retry after a reuse error. diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 4c5b584bc8ad..6e63dde73e38 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -70,6 +70,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) var c2 Conn @@ -152,6 +153,7 @@ func testDialer(t *testing.T, secure bool) { LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) go echoListen(ctx, l1) @@ -227,6 +229,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { LocalPeer: p2.ID, // PrivateKey: key2, -- dont give it key. we'll just close the conn. } + d2.AddDialer(new(BasicMaDialer)) errs := make(chan error, 100) done := make(chan struct{}, 1) @@ -253,7 +256,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { c, err := d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { - errs <- err + t.Fatal(err) } c.Close() // close it early. diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 82008593057d..08a25f527fa5 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -54,16 +54,16 @@ type Conn interface { // Dial function as before, but it would have many arguments, as dialing is // no longer simple (need a peerstore, a local peer, a context, a network, etc) type Dialer struct { - - // Dialer is an optional manet.Dialer to use. - Dialer manet.Dialer - // LocalPeer is the identity of the local Peer. LocalPeer peer.ID // LocalAddrs is a set of local addresses to use. LocalAddrs []ma.Multiaddr + // Dialers are the sub-dialers usable by this dialer + // selected in order based on the address being dialed + Dialers []ProtoDialer + // PrivateKey used to initialize a secure connection. // Warning: if PrivateKey is nil, connection will not be secured. PrivateKey ic.PrivKey diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 1cd6aa2c32d3..eaed0da388ae 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -154,6 +154,10 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey return nil, err } + return WrapManetListener(ctx, ml, local, sk) +} + +func WrapManetListener(ctx context.Context, ml manet.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, diff --git a/p2p/net/conn/transport.go b/p2p/net/conn/transport.go new file mode 100644 index 000000000000..928412ff4ec5 --- /dev/null +++ b/p2p/net/conn/transport.go @@ -0,0 +1,191 @@ +package conn + +import ( + "net" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp" + reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" +) + +type Transport interface { + manet.Listener + ProtoDialer +} + +type ProtoDialer interface { + Dial(raddr ma.Multiaddr) (manet.Conn, error) + Matches(ma.Multiaddr) bool +} + +type TcpReuseTransport struct { + list manet.Listener + laddr ma.Multiaddr + + rd reuseport.Dialer + madialer manet.Dialer +} + +var _ Transport = (*TcpReuseTransport)(nil) + +func NewTcpReuseTransport(base manet.Dialer, laddr ma.Multiaddr) (*TcpReuseTransport, error) { + rd := reuseport.Dialer{base.Dialer} + + list, err := manet.Listen(laddr) + if err != nil { + return nil, err + } + + // get the local net.Addr manually + la, err := manet.ToNetAddr(laddr) + if err != nil { + return nil, err // something wrong with laddr. + } + + rd.D.LocalAddr = la + + return &TcpReuseTransport{ + list: list, + laddr: laddr, + rd: rd, + madialer: base, + }, nil +} + +func (d *TcpReuseTransport) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + conn, err := d.rd.Dial(network, netraddr) + if err == nil { + return manet.WrapNetConn(conn) + } + if !reuseErrShouldRetry(err) { + return nil, err + } + + return d.madialer.Dial(raddr) +} + +func (d *TcpReuseTransport) Matches(a ma.Multiaddr) bool { + return IsTcpMultiaddr(a) +} + +func (d *TcpReuseTransport) Accept() (manet.Conn, error) { + c, err := d.list.Accept() + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (d *TcpReuseTransport) Addr() net.Addr { + return d.rd.D.LocalAddr +} + +func (t *TcpReuseTransport) Multiaddr() ma.Multiaddr { + return t.list.Multiaddr() +} + +func (t *TcpReuseTransport) NetListener() net.Listener { + return t.list.NetListener() +} + +func (d *TcpReuseTransport) Close() error { + return d.list.Close() +} + +func IsTcpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" +} + +func IsUtpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +type UtpReuseTransport struct { + s *mautp.Socket + laddr ma.Multiaddr +} + +func NewUtpReuseTransport(laddr ma.Multiaddr) (*UtpReuseTransport, error) { + network, addr, err := manet.DialArgs(laddr) + if err != nil { + return nil, err + } + + us, err := mautp.NewSocket(network, addr) + if err != nil { + return nil, err + } + + mmm, err := manet.FromNetAddr(us.Addr()) + if err != nil { + return nil, err + } + + return &UtpReuseTransport{ + s: us, + laddr: mmm, + }, nil +} + +func (d *UtpReuseTransport) Matches(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +func (d *UtpReuseTransport) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + c, err := d.s.Dial(network, netraddr) + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (d *UtpReuseTransport) Accept() (manet.Conn, error) { + c, err := d.s.Accept() + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (t *UtpReuseTransport) Close() error { + return t.s.Close() +} + +func (t *UtpReuseTransport) Addr() net.Addr { + return t.s.Addr() +} + +func (t *UtpReuseTransport) Multiaddr() ma.Multiaddr { + return t.laddr +} + +func (t *UtpReuseTransport) NetListener() net.Listener { + return t.s +} + +type BasicMaDialer struct{} + +func (d *BasicMaDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + return manet.Dial(raddr) +} + +func (d *BasicMaDialer) Matches(a ma.Multiaddr) bool { + return true +} diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index befd27623e52..5410182e1481 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr") var SupportedTransportStrings = []string{ "/ip4/tcp", "/ip6/tcp", - // "/ip4/udp/utp", disabled because the lib is broken - // "/ip6/udp/utp", disabled because the lib is broken + "/ip4/udp/utp", + "/ip6/udp/utp", // "/ip4/udp/udt", disabled because the lib doesnt work on arm // "/ip6/udp/udt", disabled because the lib doesnt work on arm } @@ -104,6 +104,7 @@ func AddrUsable(a ma.Multiaddr, partial bool) bool { return false } } + return true } diff --git a/p2p/net/swarm/addr/addr_test.go b/p2p/net/swarm/addr/addr_test.go index eb843ffc0978..85419fffc43a 100644 --- a/p2p/net/swarm/addr/addr_test.go +++ b/p2p/net/swarm/addr/addr_test.go @@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) { bad := []ma.Multiaddr{ newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local @@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"), newMultiaddr(t, "/ip6/::1/tcp/1234"), + newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -39,9 +39,13 @@ func TestFilterAddrs(t *testing.T) { if AddrUsable(a, false) { t.Errorf("addr %s should be unusable", a) } + /* This doesnt make sense anymore + With utp allowed, udp gets 'allowed' as a partial address. + if AddrUsable(a, true) { t.Errorf("addr %s should be unusable", a) } + */ } for _, a := range good { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index d2d6d3971379..5197f05251a0 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -8,13 +8,16 @@ import ( "time" metrics "github.com/ipfs/go-ipfs/metrics" + mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" + conn "github.com/ipfs/go-ipfs/p2p/net/conn" filter "github.com/ipfs/go-ipfs/p2p/net/filter" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" logging "github.com/ipfs/go-ipfs/vendor/QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8/go-log" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" @@ -58,6 +61,8 @@ type Swarm struct { backf dialbackoff dialT time.Duration // mainly for tests + dialer *conn.Dialer + notifmu sync.RWMutex notifs map[inet.Notifiee]ps.Notifiee @@ -91,6 +96,14 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, bwc: bwc, fdRateLimit: make(chan struct{}, concurrentFdDials), Filters: filter.NewFilters(), + dialer: &conn.Dialer{ + LocalPeer: local, + LocalAddrs: listenAddrs, + PrivateKey: peers.PrivKey(local), + Wrapper: func(c manet.Conn) manet.Conn { + return mconn.WrapConn(bwc, c) + }, + }, } // configure Swarm @@ -101,7 +114,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, prom.MustRegisterOrGet(peersTotal) s.Notify((*metricsNotifiee)(s)) - return s, s.listen(listenAddrs) + return s, s.setupAddresses(listenAddrs) } func (s *Swarm) teardown() error { @@ -134,7 +147,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { return err } - return s.listen(addrs) + return s.setupAddresses(addrs) } // Process returns the Process of the swarm diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index b75b491c42b5..41e524acb8d5 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -23,9 +23,8 @@ func TestFilterAddrs(t *testing.T) { } bad := []ma.Multiaddr{ - m("/ip4/1.2.3.4/udp/1234"), // unreliable + //m("/ip4/1.2.3.4/udp/1234"), // unreliable, fails due to utp working m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm m("/ip6/fe80::1/tcp/0"), // link local m("/ip6/fe80::100/tcp/1234"), // link local @@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ m("/ip4/127.0.0.1/tcp/0"), m("/ip6/::1/tcp/0"), + m("/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -70,9 +70,11 @@ func TestFilterAddrs(t *testing.T) { t.Fatal("should have failed to create swarm") } + /* I'm not sure how i feel about trying to listen on bad addresses succeeding if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil { t.Fatal("should have succeeded in creating swarm", err) } + */ } func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) { @@ -113,7 +115,7 @@ func TestDialBadAddrs(t *testing.T) { p := testutil.RandPeerIDFatal(t) s.peers.AddAddr(p, a, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, p); err == nil { - t.Error("swarm should not dial: %s", m) + t.Errorf("swarm should not dial: %s", m) } } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1e2e34143c41..77c76839188f 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -4,19 +4,17 @@ import ( "bytes" "errors" "fmt" - "net" "sort" "sync" "time" - mconn "github.com/ipfs/go-ipfs/metrics/conn" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" conn "github.com/ipfs/go-ipfs/p2p/net/conn" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get our own addrs. try dialing out from our listener addresses (reusing ports) - // Note that using our peerstore's addresses here is incorrect, as that would - // include observed addresses. TODO: make peerstore's address book smarter. - localAddrs := s.ListenAddresses() - if len(localAddrs) == 0 { - log.Debug("Dialing out with no local addresses.") - } - // get remote peer addrs remoteAddrs := s.peers.Addrs(p) // make sure we can use the addresses. @@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } - // open connection to peer - d := &conn.Dialer{ - Dialer: manet.Dialer{ - Dialer: net.Dialer{ - Timeout: s.dialT, - }, - }, - LocalPeer: s.local, - LocalAddrs: localAddrs, - PrivateKey: sk, - Wrapper: func(c manet.Conn) manet.Conn { - return mconn.WrapConn(s.bwc, c) - }, - } - // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, d, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrs) if err != nil { logdial["error"] = err return nil, err @@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { // sort addresses so preferred addresses are dialed sooner sort.Sort(AddrList(remoteAddrs)) @@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote connsout := conns errsout := errs - connC, err := s.dialAddr(ctx, d, p, addr) + connC, err := s.dialAddr(ctx, p, addr) if err != nil { connsout = nil } else if connC == nil { @@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote return nil, exitErr } -func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, addr) - connC, err := d.Dial(ctx, addr, p) + connC, err := s.dialer.Dial(ctx, addr, p) if err != nil { return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err) } diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d1bcb0752128..e7819abd0195 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -2,6 +2,7 @@ package swarm import ( "fmt" + "net" mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" @@ -13,58 +14,56 @@ import ( manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - multierr "github.com/ipfs/go-ipfs/thirdparty/multierr" ) -// Open listeners for each network the swarm should listen on -func (s *Swarm) listen(addrs []ma.Multiaddr) error { - +// Open listeners and reuse-dialers for the given addresses +func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error { for _, addr := range addrs { if !addrutil.AddrUsable(addr, true) { return fmt.Errorf("cannot use addr: %s", addr) } } - retErr := multierr.New() + dialer := manet.Dialer{Dialer: net.Dialer{Timeout: DialTimeout}} + for _, a := range addrs { + switch { + case conn.IsTcpMultiaddr(a): + tpt, err := conn.NewTcpReuseTransport(dialer, a) + if err != nil { + return err + } - // listen on every address - for i, addr := range addrs { - err := s.setupListener(addr) - if err != nil { - if retErr.Errors == nil { - retErr.Errors = make([]error, len(addrs)) + s.dialer.AddDialer(tpt) + err = s.addListener(tpt) + if err != nil { + return err + } + case conn.IsUtpMultiaddr(a): + tpt, err := conn.NewUtpReuseTransport(a) + if err != nil { + return err + } + + s.dialer.AddDialer(tpt) + err = s.addListener(tpt) + if err != nil { + return err } - retErr.Errors[i] = err - log.Debugf("Failed to listen on: %s - %s", addr, err) - } - } - if retErr.Errors != nil { - return retErr + } } return nil } -// Listen for new connections on the given multiaddr -func (s *Swarm) setupListener(maddr ma.Multiaddr) error { - - // TODO rethink how this has to work. (jbenet) - // - // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) - // if err != nil { - // return err - // } - // for _, a := range resolved { - // s.peers.AddAddr(s.local, a) - // } +func (s *Swarm) addListener(malist manet.Listener) error { sk := s.peers.PrivKey(s.local) if sk == nil { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.Context(), maddr, s.local, sk) + + list, err := conn.WrapManetListener(s.Context(), malist, s.local, sk) if err != nil { return err } @@ -77,6 +76,10 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { }) } + return s.addConnListener(list) +} + +func (s *Swarm) addConnListener(list conn.Listener) error { // AddListener to the peerstream Listener. this will begin accepting connections // and streams! sl, err := s.swarm.AddListener(list) @@ -85,6 +88,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { } log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + maddr := list.Multiaddr() + // signal to our notifiees on successful conn. s.notifyAll(func(n inet.Notifiee) { n.Listen((*Network)(s), maddr) @@ -107,7 +112,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { if !more { return } - log.Warningf("swarm listener accept error: %s", err) + log.Errorf("swarm listener accept error: %s", err) case <-ctx.Done(): return } diff --git a/test/sharness/t0130-multinode.sh b/test/sharness/t0130-multinode.sh index 7ba364ec5bd2..fa9b9d91c15b 100755 --- a/test/sharness/t0130-multinode.sh +++ b/test/sharness/t0130-multinode.sh @@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" ' run_basic_test +test_expect_success "set up utp testbed" ' + iptb init -n 5 -p 0 -f --bootstrap=none --utp +' + +run_basic_test + test_done