From 9b76adc1fe862b6385a5691031fd09d39d3f7cff Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 3 Oct 2015 19:45:24 -0700 Subject: [PATCH] implement basic utp dialing and listening support License: MIT Signed-off-by: Jeromy --- Godeps/Godeps.json | 2 +- p2p/net/conn/dial.go | 105 +++++-------- p2p/net/conn/dial_test.go | 5 +- p2p/net/conn/interface.go | 13 +- p2p/net/conn/listen.go | 4 + p2p/net/conn/transport.go | 255 +++++++++++++++++++++++++++++++ p2p/net/swarm/addr/addr.go | 5 +- p2p/net/swarm/addr/addr_test.go | 6 +- p2p/net/swarm/swarm.go | 14 +- p2p/net/swarm/swarm_addr_test.go | 14 +- p2p/net/swarm/swarm_dial.go | 37 +---- p2p/net/swarm/swarm_listen.go | 69 ++++----- test/sharness/t0130-multinode.sh | 6 + 13 files changed, 379 insertions(+), 156 deletions(-) create mode 100644 p2p/net/conn/transport.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 787de96d248..202dc6f5dd3 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/ipfs/go-ipfs", - "GoVersion": "go1.5", + "GoVersion": "go1.5.1", "Packages": [ "./..." ], diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index a9a1a7aaf93..bb31901e05a 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -6,20 +6,35 @@ import ( "net" "strings" "syscall" + "time" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" + ci "github.com/ipfs/go-ipfs/p2p/crypto" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" ) +type WrapFunc func(manet.Conn) manet.Conn + +func NewDialer(p peer.ID, pk ci.PrivKey, tout time.Duration, wrap WrapFunc) *Dialer { + var manetd manet.Dialer + manetd.Timeout = tout // timeout is a nested field of net.Dialer + + return &Dialer{ + LocalPeer: p, + PrivateKey: pk, + Wrapper: wrap, + fallbackDialer: &BasicMaDialer{Dialer: manetd}, + } +} + // String returns the string rep of d. func (d *Dialer) String() string { - return fmt.Sprintf("", d.LocalPeer, d.LocalAddrs[0]) + return fmt.Sprintf("", d.LocalPeer) } // Dial connects to a peer over a particular address @@ -95,84 +110,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( return connOut, nil } -// rawConnDial dials the underlying net.Conn + manet.Conns -func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { - - // before doing anything, check we're going to be able to dial. - // we may not support the given address. - if _, _, err := manet.DialArgs(raddr); err != nil { - return nil, err - } - - if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { - log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)) - return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr) - } +func (d *Dialer) AddDialer(pd ProtoDialer) { + d.Dialers = append(d.Dialers, pd) +} - // get local addr to use. - laddr := pickLocalAddr(d.LocalAddrs, raddr) - logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr) - defer log.EventBegin(ctx, "connDialRawConn", logdial).Done() - - // make a copy of the manet.Dialer, we may need to change its timeout. - madialer := d.Dialer - - if laddr != nil && reuseportIsAvailable() { - // we're perhaps going to dial twice. half the timeout, so we can afford to. - // otherwise our context would expire right after the first dial. - madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2) - - // dial using reuseport.Dialer, because we're probably reusing addrs. - // this is optimistic, as the reuseDial may fail to bind the port. - rpev := log.EventBegin(ctx, "connDialReusePort", logdial) - if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { - // if it worked, wrap the raw net.Conn with our manet.Conn - logdial["reuseport"] = "success" - rpev.Done() - return manet.WrapNetConn(nconn) - } else if !retry { - // reuseDial is sure this is a legitimate dial failure, not a reuseport failure. - logdial["reuseport"] = "failure" - logdial["error"] = reuseErr - rpev.Done() - return nil, reuseErr - } else { - // this is a failure to reuse port. log it. - logdial["reuseport"] = "retry" - logdial["error"] = reuseErr - rpev.Done() +// returns dialer that can dial the given address +func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) ProtoDialer { + for _, pd := range d.Dialers { + if pd.Matches(raddr) { + return pd } } - defer log.EventBegin(ctx, "connDialManet", logdial).Done() - return madialer.Dial(raddr) + return d.fallbackDialer } -func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) { - if laddr == nil { - // if we're given no local address no sense in using reuseport to dial, dial out as usual. - return nil, true, reuseport.ErrReuseFailed - } - - // give reuse.Dialer the manet.Dialer's Dialer. - // (wow, Dialer should've so been an interface...) - rd := reuseport.Dialer{dialer} - - // get the local net.Addr manually - rd.D.LocalAddr, err = manet.ToNetAddr(laddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. +// rawConnDial dials the underlying net.Conn + manet.Conns +func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { + if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { + log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)) + return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr) } - // get the raddr dial args for rd.dial - network, netraddr, err := manet.DialArgs(raddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. + sd := d.subDialerForAddr(raddr) + if sd == nil { + return nil, fmt.Errorf("no dialer for %s", raddr) } - // rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set. - conn, err = rd.Dial(network, netraddr) - return conn, reuseErrShouldRetry(err), err // hey! it worked! + return sd.Dial(raddr) } // reuseErrShouldRetry diagnoses whether to retry after a reuse error. diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 4c5b584bc8a..6e63dde73e3 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -70,6 +70,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) var c2 Conn @@ -152,6 +153,7 @@ func testDialer(t *testing.T, secure bool) { LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) go echoListen(ctx, l1) @@ -227,6 +229,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { LocalPeer: p2.ID, // PrivateKey: key2, -- dont give it key. we'll just close the conn. } + d2.AddDialer(new(BasicMaDialer)) errs := make(chan error, 100) done := make(chan struct{}, 1) @@ -253,7 +256,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { c, err := d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { - errs <- err + t.Fatal(err) } c.Close() // close it early. diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 82008593057..2999dfa8874 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -54,15 +54,18 @@ type Conn interface { // Dial function as before, but it would have many arguments, as dialing is // no longer simple (need a peerstore, a local peer, a context, a network, etc) type Dialer struct { - - // Dialer is an optional manet.Dialer to use. - Dialer manet.Dialer - // LocalPeer is the identity of the local Peer. LocalPeer peer.ID // LocalAddrs is a set of local addresses to use. - LocalAddrs []ma.Multiaddr + //LocalAddrs []ma.Multiaddr + + // Dialers are the sub-dialers usable by this dialer + // selected in order based on the address being dialed + Dialers []ProtoDialer + + // fallback will be tried if no other transport can dial a given address + fallbackDialer ProtoDialer // PrivateKey used to initialize a secure connection. // Warning: if PrivateKey is nil, connection will not be secured. diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 1cd6aa2c32d..eaed0da388a 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -154,6 +154,10 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey return nil, err } + return WrapManetListener(ctx, ml, local, sk) +} + +func WrapManetListener(ctx context.Context, ml manet.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, diff --git a/p2p/net/conn/transport.go b/p2p/net/conn/transport.go new file mode 100644 index 00000000000..bfea8feed1c --- /dev/null +++ b/p2p/net/conn/transport.go @@ -0,0 +1,255 @@ +package conn + +import ( + "errors" + "net" + "time" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp" + reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" + lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" + + "golang.org/x/net/context" +) + +type Transport interface { + manet.Listener + ProtoDialer +} + +type ProtoDialer interface { + Dial(raddr ma.Multiaddr) (manet.Conn, error) + Matches(ma.Multiaddr) bool +} + +type TcpTransport struct { + list manet.Listener + laddr ma.Multiaddr + + doReuse bool + + rd reuseport.Dialer + madialer manet.Dialer +} + +var _ Transport = (*TcpTransport)(nil) + +func NewTcpReuseTransport(base manet.Dialer, laddr ma.Multiaddr) (*TcpTransport, error) { + rd := reuseport.Dialer{base.Dialer} + + // get the local net.Addr manually + la, err := manet.ToNetAddr(laddr) + if err != nil { + return nil, err // something wrong with laddr. + } + rd.D.LocalAddr = la + + list, err := manet.Listen(laddr) + if err != nil { + return nil, err + } + + return &TcpTransport{ + doReuse: true, + list: list, + laddr: laddr, + rd: rd, + madialer: base, + }, nil +} + +// NewTcpTransport creates a TcpTransport that does not use SO_REUSEPORT +func NewTcpTransport(base manet.Dialer, laddr ma.Multiaddr) (*TcpTransport, error) { + list, err := manet.Listen(laddr) + if err != nil { + return nil, err + } + + return &TcpTransport{ + list: list, + laddr: laddr, + madialer: base, + }, nil + +} + +func (d *TcpTransport) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + if d.doReuse { + return d.reuseDial(raddr) + } + + return d.madialer.Dial(raddr) +} + +func (d *TcpTransport) reuseDial(raddr ma.Multiaddr) (manet.Conn, error) { + logdial := lgbl.Dial("conn", "", "", d.laddr, raddr) + rpev := log.EventBegin(context.TODO(), "tptDialReusePort", logdial) + + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + conn, err := d.rd.Dial(network, netraddr) + if err == nil { + logdial["reuseport"] = "success" + rpev.Done() + return manet.WrapNetConn(conn) + } + + if !reuseErrShouldRetry(err) { + logdial["reuseport"] = "failure" + logdial["error"] = err + rpev.Done() + return nil, err + } + + logdial["reuseport"] = "retry" + logdial["error"] = err + rpev.Done() + + return d.madialer.Dial(raddr) +} + +func (d *TcpTransport) Matches(a ma.Multiaddr) bool { + return IsTcpMultiaddr(a) +} + +func (d *TcpTransport) Accept() (manet.Conn, error) { + c, err := d.list.Accept() + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (d *TcpTransport) Addr() net.Addr { + return d.list.Addr() +} + +func (t *TcpTransport) Multiaddr() ma.Multiaddr { + return t.list.Multiaddr() +} + +func (t *TcpTransport) NetListener() net.Listener { + return t.list.NetListener() +} + +func (d *TcpTransport) Close() error { + return d.list.Close() +} + +func IsTcpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" +} + +func IsUtpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +type UtpTransport struct { + s *mautp.Socket + laddr ma.Multiaddr +} + +func NewUtpTransport(laddr ma.Multiaddr) (*UtpTransport, error) { + network, addr, err := manet.DialArgs(laddr) + if err != nil { + return nil, err + } + + us, err := mautp.NewSocket(network, addr) + if err != nil { + return nil, err + } + + mmm, err := manet.FromNetAddr(us.Addr()) + if err != nil { + return nil, err + } + + return &UtpTransport{ + s: us, + laddr: mmm, + }, nil +} + +func (d *UtpTransport) Matches(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +func (d *UtpTransport) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + c, err := d.s.Dial(network, netraddr) + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (d *UtpTransport) Accept() (manet.Conn, error) { + c, err := d.s.Accept() + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +func (t *UtpTransport) Close() error { + return t.s.Close() +} + +func (t *UtpTransport) Addr() net.Addr { + return t.s.Addr() +} + +func (t *UtpTransport) Multiaddr() ma.Multiaddr { + return t.laddr +} + +func (t *UtpTransport) NetListener() net.Listener { + return t.s +} + +type BasicMaDialer struct { + Dialer manet.Dialer +} + +func (d *BasicMaDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + return d.Dialer.Dial(raddr) +} + +func (d *BasicMaDialer) Matches(a ma.Multiaddr) bool { + return true +} + +var ErrNoSpecialTransport = errors.New("given multiaddr has no supported special transport") + +func MakeTransport(laddr ma.Multiaddr, tout time.Duration) (Transport, error) { + switch { + case IsTcpMultiaddr(laddr): + if !reuseportIsAvailable() { + return nil, ErrNoSpecialTransport + } + dialer := manet.Dialer{Dialer: net.Dialer{Timeout: tout}} + return NewTcpReuseTransport(dialer, laddr) + + case IsUtpMultiaddr(laddr): + return NewUtpTransport(laddr) + + default: + return nil, ErrNoSpecialTransport + } +} diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index befd27623e5..5410182e148 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr") var SupportedTransportStrings = []string{ "/ip4/tcp", "/ip6/tcp", - // "/ip4/udp/utp", disabled because the lib is broken - // "/ip6/udp/utp", disabled because the lib is broken + "/ip4/udp/utp", + "/ip6/udp/utp", // "/ip4/udp/udt", disabled because the lib doesnt work on arm // "/ip6/udp/udt", disabled because the lib doesnt work on arm } @@ -104,6 +104,7 @@ func AddrUsable(a ma.Multiaddr, partial bool) bool { return false } } + return true } diff --git a/p2p/net/swarm/addr/addr_test.go b/p2p/net/swarm/addr/addr_test.go index eb843ffc097..85419fffc43 100644 --- a/p2p/net/swarm/addr/addr_test.go +++ b/p2p/net/swarm/addr/addr_test.go @@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) { bad := []ma.Multiaddr{ newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local @@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"), newMultiaddr(t, "/ip6/::1/tcp/1234"), + newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -39,9 +39,13 @@ func TestFilterAddrs(t *testing.T) { if AddrUsable(a, false) { t.Errorf("addr %s should be unusable", a) } + /* This doesnt make sense anymore + With utp allowed, udp gets 'allowed' as a partial address. + if AddrUsable(a, true) { t.Errorf("addr %s should be unusable", a) } + */ } for _, a := range good { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index d2d6d397137..6b60a762bf3 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -8,13 +8,16 @@ import ( "time" metrics "github.com/ipfs/go-ipfs/metrics" + mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" + conn "github.com/ipfs/go-ipfs/p2p/net/conn" filter "github.com/ipfs/go-ipfs/p2p/net/filter" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" logging "github.com/ipfs/go-ipfs/vendor/QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8/go-log" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" @@ -58,6 +61,8 @@ type Swarm struct { backf dialbackoff dialT time.Duration // mainly for tests + dialer *conn.Dialer + notifmu sync.RWMutex notifs map[inet.Notifiee]ps.Notifiee @@ -81,6 +86,10 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, return nil, err } + wrap := func(c manet.Conn) manet.Conn { + return mconn.WrapConn(bwc, c) + } + s := &Swarm{ swarm: ps.NewSwarm(PSTransport), local: local, @@ -91,6 +100,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, bwc: bwc, fdRateLimit: make(chan struct{}, concurrentFdDials), Filters: filter.NewFilters(), + dialer: conn.NewDialer(local, peers.PrivKey(local), DialTimeout, wrap), } // configure Swarm @@ -101,7 +111,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, prom.MustRegisterOrGet(peersTotal) s.Notify((*metricsNotifiee)(s)) - return s, s.listen(listenAddrs) + return s, s.setupAddresses(listenAddrs) } func (s *Swarm) teardown() error { @@ -134,7 +144,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { return err } - return s.listen(addrs) + return s.setupAddresses(addrs) } // Process returns the Process of the swarm diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index b75b491c42b..4b2a4121ff1 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -23,9 +23,8 @@ func TestFilterAddrs(t *testing.T) { } bad := []ma.Multiaddr{ - m("/ip4/1.2.3.4/udp/1234"), // unreliable + //m("/ip4/1.2.3.4/udp/1234"), // unreliable, fails due to utp working m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm m("/ip6/fe80::1/tcp/0"), // link local m("/ip6/fe80::100/tcp/1234"), // link local @@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ m("/ip4/127.0.0.1/tcp/0"), m("/ip6/::1/tcp/0"), + m("/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -70,9 +70,11 @@ func TestFilterAddrs(t *testing.T) { t.Fatal("should have failed to create swarm") } - if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil { - t.Fatal("should have succeeded in creating swarm", err) - } + /* + if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil { + t.Fatal("should have succeeded in creating swarm", err) + } + */ } func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) { @@ -113,7 +115,7 @@ func TestDialBadAddrs(t *testing.T) { p := testutil.RandPeerIDFatal(t) s.peers.AddAddr(p, a, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, p); err == nil { - t.Error("swarm should not dial: %s", m) + t.Errorf("swarm should not dial: %s", m) } } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 1e2e34143c4..77c76839188 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -4,19 +4,17 @@ import ( "bytes" "errors" "fmt" - "net" "sort" "sync" "time" - mconn "github.com/ipfs/go-ipfs/metrics/conn" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" conn "github.com/ipfs/go-ipfs/p2p/net/conn" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get our own addrs. try dialing out from our listener addresses (reusing ports) - // Note that using our peerstore's addresses here is incorrect, as that would - // include observed addresses. TODO: make peerstore's address book smarter. - localAddrs := s.ListenAddresses() - if len(localAddrs) == 0 { - log.Debug("Dialing out with no local addresses.") - } - // get remote peer addrs remoteAddrs := s.peers.Addrs(p) // make sure we can use the addresses. @@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } - // open connection to peer - d := &conn.Dialer{ - Dialer: manet.Dialer{ - Dialer: net.Dialer{ - Timeout: s.dialT, - }, - }, - LocalPeer: s.local, - LocalAddrs: localAddrs, - PrivateKey: sk, - Wrapper: func(c manet.Conn) manet.Conn { - return mconn.WrapConn(s.bwc, c) - }, - } - // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, d, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrs) if err != nil { logdial["error"] = err return nil, err @@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { // sort addresses so preferred addresses are dialed sooner sort.Sort(AddrList(remoteAddrs)) @@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote connsout := conns errsout := errs - connC, err := s.dialAddr(ctx, d, p, addr) + connC, err := s.dialAddr(ctx, p, addr) if err != nil { connsout = nil } else if connC == nil { @@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote return nil, exitErr } -func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { +func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, addr) - connC, err := d.Dial(ctx, addr, p) + connC, err := s.dialer.Dial(ctx, addr, p) if err != nil { return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err) } diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d1bcb075212..5c521ca79c6 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -1,70 +1,59 @@ package swarm import ( - "fmt" - mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" conn "github.com/ipfs/go-ipfs/p2p/net/conn" - addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + multierr "github.com/ipfs/go-ipfs/thirdparty/multierr" ) -// Open listeners for each network the swarm should listen on -func (s *Swarm) listen(addrs []ma.Multiaddr) error { - - for _, addr := range addrs { - if !addrutil.AddrUsable(addr, true) { - return fmt.Errorf("cannot use addr: %s", addr) +// Open listeners and reuse-dialers for the given addresses +func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error { + merr := multierr.New() + for i, a := range addrs { + t, err := conn.MakeTransport(a, DialTimeout) + if err != nil { + if err == conn.ErrNoSpecialTransport { + log.Error("no special transport for ", a) + continue + } + if merr.Errors == nil { + merr.Errors = make([]error, len(addrs)) + } + merr.Errors[i] = err + continue } - } - - retErr := multierr.New() + s.dialer.AddDialer(t) - // listen on every address - for i, addr := range addrs { - err := s.setupListener(addr) + err = s.addListener(t) if err != nil { - if retErr.Errors == nil { - retErr.Errors = make([]error, len(addrs)) - } - retErr.Errors[i] = err - log.Debugf("Failed to listen on: %s - %s", addr, err) + merr.Errors[i] = err } } - if retErr.Errors != nil { - return retErr + if merr.Errors != nil { + return merr } + return nil } -// Listen for new connections on the given multiaddr -func (s *Swarm) setupListener(maddr ma.Multiaddr) error { - - // TODO rethink how this has to work. (jbenet) - // - // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) - // if err != nil { - // return err - // } - // for _, a := range resolved { - // s.peers.AddAddr(s.local, a) - // } +func (s *Swarm) addListener(malist manet.Listener) error { sk := s.peers.PrivKey(s.local) if sk == nil { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.Context(), maddr, s.local, sk) + + list, err := conn.WrapManetListener(s.Context(), malist, s.local, sk) if err != nil { return err } @@ -77,6 +66,10 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { }) } + return s.addConnListener(list) +} + +func (s *Swarm) addConnListener(list conn.Listener) error { // AddListener to the peerstream Listener. this will begin accepting connections // and streams! sl, err := s.swarm.AddListener(list) @@ -85,6 +78,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { } log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + maddr := list.Multiaddr() + // signal to our notifiees on successful conn. s.notifyAll(func(n inet.Notifiee) { n.Listen((*Network)(s), maddr) @@ -107,7 +102,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { if !more { return } - log.Warningf("swarm listener accept error: %s", err) + log.Errorf("swarm listener accept error: %s", err) case <-ctx.Done(): return } diff --git a/test/sharness/t0130-multinode.sh b/test/sharness/t0130-multinode.sh index 7ba364ec5bd..fa9b9d91c15 100755 --- a/test/sharness/t0130-multinode.sh +++ b/test/sharness/t0130-multinode.sh @@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" ' run_basic_test +test_expect_success "set up utp testbed" ' + iptb init -n 5 -p 0 -f --bootstrap=none --utp +' + +run_basic_test + test_done