From e75c69a248ba8de9bd4dcd9af855e46979ccb305 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 4 Oct 2015 21:19:44 -0700 Subject: [PATCH] refactor creation of dialers and listeners in swarm License: MIT Signed-off-by: Jeromy --- p2p/net/conn/dial.go | 199 ++++++++++++++++++------------- p2p/net/conn/dial_test.go | 5 +- p2p/net/conn/interface.go | 8 +- p2p/net/conn/listen.go | 4 + p2p/net/swarm/swarm.go | 33 +++-- p2p/net/swarm/swarm_addr_test.go | 6 +- p2p/net/swarm/swarm_dial.go | 2 +- p2p/net/swarm/swarm_listen.go | 83 ++++++++----- 8 files changed, 197 insertions(+), 143 deletions(-) diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 29d5d4d2c447..b775ae2d43fa 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -4,11 +4,11 @@ import ( "fmt" "math/rand" "net" - "strings" "syscall" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp" reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" @@ -95,98 +95,28 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( return connOut, nil } -// rawConnDial dials the underlying net.Conn + manet.Conns -func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { - - // before doing anything, check we're going to be able to dial. - // we may not support the given address. - if _, _, err := manet.DialArgs(raddr); err != nil { - return nil, err - } - - if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { - log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)) - return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr) - } - - // get local addr to use. - laddr := pickLocalAddr(d.LocalAddrs, raddr) - logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr) - defer log.EventBegin(ctx, "connDialRawConn", logdial).Done() - - // make a copy of the manet.Dialer, we may need to change its timeout. - madialer := d.Dialer - - if laddr != nil && isTcpMultiaddr(raddr) && reuseportIsAvailable() { - // we're perhaps going to dial twice. half the timeout, so we can afford to. - // otherwise our context would expire right after the first dial. - madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2) - - // dial using reuseport.Dialer, because we're probably reusing addrs. - // this is optimistic, as the reuseDial may fail to bind the port. - rpev := log.EventBegin(ctx, "connDialReusePort", logdial) - if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { - // if it worked, wrap the raw net.Conn with our manet.Conn - logdial["reuseport"] = "success" - rpev.Done() - return manet.WrapNetConn(nconn) - } else if !retry { - // reuseDial is sure this is a legitimate dial failure, not a reuseport failure. - logdial["reuseport"] = "failure" - logdial["error"] = reuseErr - rpev.Done() - return nil, reuseErr - } else { - // this is a failure to reuse port. log it. - logdial["reuseport"] = "retry" - logdial["error"] = reuseErr - rpev.Done() - } - } +func (d *Dialer) AddDialer(pd ProtoDialer) { + d.Dialers = append(d.Dialers, pd) +} - useLocalAddr := true - for _, p := range raddr.Protocols() { - if p.Name == "utp" { - useLocalAddr = false +// returns dialer that can dial the given address +func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) ProtoDialer { + for _, pd := range d.Dialers { + if pd.Matches(raddr) { + return pd } } - defer log.EventBegin(ctx, "connDialManet", logdial).Done() - if !useLocalAddr { - madialer.LocalAddr = nil - } - return madialer.Dial(raddr) -} - -func isTcpMultiaddr(a ma.Multiaddr) bool { - p := a.Protocols() - return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" + return nil } -func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) { - if laddr == nil { - // if we're given no local address no sense in using reuseport to dial, dial out as usual. - return nil, true, reuseport.ErrReuseFailed - } - - // give reuse.Dialer the manet.Dialer's Dialer. - // (wow, Dialer should've so been an interface...) - rd := reuseport.Dialer{dialer} - - // get the local net.Addr manually - rd.D.LocalAddr, err = manet.ToNetAddr(laddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. - } - - // get the raddr dial args for rd.dial - network, netraddr, err := manet.DialArgs(raddr) - if err != nil { - return nil, true, err // something wrong with laddr. retry without. +// rawConnDial dials the underlying net.Conn + manet.Conns +func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) { + sd := d.subDialerForAddr(raddr) + if sd == nil { + return nil, fmt.Errorf("no dialer for %s", raddr) } - // rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set. - conn, err = rd.Dial(network, netraddr) - return conn, reuseErrShouldRetry(err), err // hey! it worked! + return sd.Dial(raddr) } // reuseErrShouldRetry diagnoses whether to retry after a reuse error. @@ -279,3 +209,100 @@ func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr { } return nil } + +type ProtoDialer interface { + Dial(raddr ma.Multiaddr) (manet.Conn, error) + Matches(ma.Multiaddr) bool +} + +type TcpReuseDialer struct { + laddr ma.Multiaddr + rd reuseport.Dialer + madialer manet.Dialer +} + +func NewTcpReuseDialer(base manet.Dialer, laddr ma.Multiaddr) (*TcpReuseDialer, error) { + rd := reuseport.Dialer{base.Dialer} + + // get the local net.Addr manually + la, err := manet.ToNetAddr(laddr) + if err != nil { + return nil, err // something wrong with laddr. + } + + rd.D.LocalAddr = la + + return &TcpReuseDialer{ + laddr: laddr, + rd: rd, + madialer: base, + }, nil +} + +func (d *TcpReuseDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + conn, err := d.rd.Dial(network, netraddr) + if err == nil { + return manet.WrapNetConn(conn) + } + if !reuseErrShouldRetry(err) { + return nil, err + } + + return d.madialer.Dial(raddr) +} + +func (d *TcpReuseDialer) Matches(a ma.Multiaddr) bool { + return IsTcpMultiaddr(a) +} + +func IsTcpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" +} + +func IsUtpMultiaddr(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +type UtpReuseDialer struct { + d *mautp.Dialer +} + +func NewUtpReuseDialer(d *mautp.Dialer) *UtpReuseDialer { + return &UtpReuseDialer{d} +} + +func (d *UtpReuseDialer) Matches(a ma.Multiaddr) bool { + p := a.Protocols() + return len(p) == 3 && p[2].Name == "utp" +} + +func (d *UtpReuseDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + network, netraddr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + c, err := d.d.Dial(network, netraddr) + if err != nil { + return nil, err + } + + return manet.WrapNetConn(c) +} + +type BasicMaDialer struct{} + +func (d *BasicMaDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + return manet.Dial(raddr) +} + +func (d *BasicMaDialer) Matches(a ma.Multiaddr) bool { + return true +} diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 4c5b584bc8ad..6e63dde73e38 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -70,6 +70,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) var c2 Conn @@ -152,6 +153,7 @@ func testDialer(t *testing.T, secure bool) { LocalPeer: p2.ID, PrivateKey: key2, } + d2.AddDialer(new(BasicMaDialer)) go echoListen(ctx, l1) @@ -227,6 +229,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { LocalPeer: p2.ID, // PrivateKey: key2, -- dont give it key. we'll just close the conn. } + d2.AddDialer(new(BasicMaDialer)) errs := make(chan error, 100) done := make(chan struct{}, 1) @@ -253,7 +256,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) { c, err := d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { - errs <- err + t.Fatal(err) } c.Close() // close it early. diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 82008593057d..08a25f527fa5 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -54,16 +54,16 @@ type Conn interface { // Dial function as before, but it would have many arguments, as dialing is // no longer simple (need a peerstore, a local peer, a context, a network, etc) type Dialer struct { - - // Dialer is an optional manet.Dialer to use. - Dialer manet.Dialer - // LocalPeer is the identity of the local Peer. LocalPeer peer.ID // LocalAddrs is a set of local addresses to use. LocalAddrs []ma.Multiaddr + // Dialers are the sub-dialers usable by this dialer + // selected in order based on the address being dialed + Dialers []ProtoDialer + // PrivateKey used to initialize a secure connection. // Warning: if PrivateKey is nil, connection will not be secured. PrivateKey ic.PrivKey diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 1cd6aa2c32d3..eaed0da388ae 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -154,6 +154,10 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey return nil, err } + return WrapManetListener(ctx, ml, local, sk) +} + +func WrapManetListener(ctx context.Context, ml manet.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index bcd1999b834b..5cdd7aa55b53 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -4,7 +4,6 @@ package swarm import ( "fmt" - "net" "sync" "time" @@ -84,27 +83,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, return nil, err } - // open connection to peer - d := &conn.Dialer{ - Dialer: manet.Dialer{ - Dialer: net.Dialer{ - Timeout: DialTimeout, - }, - }, - LocalPeer: local, - LocalAddrs: listenAddrs, - PrivateKey: peers.PrivKey(local), - Wrapper: func(c manet.Conn) manet.Conn { - return mconn.WrapConn(bwc, c) - }, - } - s := &Swarm{ swarm: ps.NewSwarm(PSTransport), local: local, peers: peers, ctx: ctx, - dialer: d, dialT: DialTimeout, notifs: make(map[inet.Notifiee]ps.Notifiee), bwc: bwc, @@ -119,7 +102,19 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, prom.MustRegisterOrGet(peersTotal) s.Notify((*metricsNotifiee)(s)) - return s, s.listen(listenAddrs) + // open connection to peer + d := &conn.Dialer{ + LocalPeer: local, + LocalAddrs: listenAddrs, + PrivateKey: peers.PrivKey(local), + Wrapper: func(c manet.Conn) manet.Conn { + return mconn.WrapConn(bwc, c) + }, + } + + s.dialer = d + + return s, s.setupAddresses(listenAddrs) } func (s *Swarm) teardown() error { @@ -152,7 +147,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { return err } - return s.listen(addrs) + return s.setupAddresses(addrs) } // Process returns the Process of the swarm diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index b75b491c42b5..bbc5100ecd86 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -23,9 +23,8 @@ func TestFilterAddrs(t *testing.T) { } bad := []ma.Multiaddr{ - m("/ip4/1.2.3.4/udp/1234"), // unreliable + //m("/ip4/1.2.3.4/udp/1234"), // unreliable, fails due to utp working m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm m("/ip6/fe80::1/tcp/0"), // link local m("/ip6/fe80::100/tcp/1234"), // link local @@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) { good := []ma.Multiaddr{ m("/ip4/127.0.0.1/tcp/0"), m("/ip6/::1/tcp/0"), + m("/ip4/1.2.3.4/udp/1234/utp"), } goodAndBad := append(good, bad...) @@ -70,9 +70,11 @@ func TestFilterAddrs(t *testing.T) { t.Fatal("should have failed to create swarm") } + /* I'm not sure how i feel about trying to listen on bad addresses succeeding if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil { t.Fatal("should have succeeded in creating swarm", err) } + */ } func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) { diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index ddff97abf03b..7537b226d0a0 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -294,7 +294,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { remoteAddrs = addrutil.Subtract(remoteAddrs, ila) remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) - log.Errorf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) + log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) if len(remoteAddrs) == 0 { err := errors.New("peer has no addresses") logdial["error"] = err diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index c1128997c678..aff2be9120cb 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -2,6 +2,7 @@ package swarm import ( "fmt" + "net" mconn "github.com/ipfs/go-ipfs/metrics/conn" inet "github.com/ipfs/go-ipfs/p2p/net" @@ -11,60 +12,76 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - multierr "github.com/ipfs/go-ipfs/thirdparty/multierr" ) // Open listeners for each network the swarm should listen on -func (s *Swarm) listen(addrs []ma.Multiaddr) error { - +func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error { for _, addr := range addrs { if !addrutil.AddrUsable(addr, true) { return fmt.Errorf("cannot use addr: %s", addr) } } - retErr := multierr.New() + dialer := manet.Dialer{Dialer: net.Dialer{Timeout: DialTimeout}} + for _, a := range addrs { + switch { + case conn.IsTcpMultiaddr(a): + rud, err := conn.NewTcpReuseDialer(dialer, a) + if err != nil { + return err + } + + s.dialer.AddDialer(rud) - // listen on every address - for i, addr := range addrs { - err := s.setupListener(addr) - if err != nil { - if retErr.Errors == nil { - retErr.Errors = make([]error, len(addrs)) + l, err := manet.Listen(a) + if err != nil { + return err + } + + err = s.addListener(l) + if err != nil { + return err + } + case conn.IsUtpMultiaddr(a): + network, addr, err := manet.DialArgs(a) + if err != nil { + return err + } + + list, err := mautp.Listen(network, addr) + if err != nil { + return err } - retErr.Errors[i] = err - log.Debugf("Failed to listen on: %s - %s", addr, err) - } - } - if retErr.Errors != nil { - return retErr + malist, err := manet.WrapNetListener(list) + if err != nil { + return err + } + + err = s.addListener(malist) + if err != nil { + return err + } + + rud := conn.NewUtpReuseDialer(list.(*mautp.Listener).Dialer()) + s.dialer.AddDialer(rud) + } } return nil } -// Listen for new connections on the given multiaddr -func (s *Swarm) setupListener(maddr ma.Multiaddr) error { - - // TODO rethink how this has to work. (jbenet) - // - // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) - // if err != nil { - // return err - // } - // for _, a := range resolved { - // s.peers.AddAddr(s.local, a) - // } +func (s *Swarm) addListener(malist manet.Listener) error { sk := s.peers.PrivKey(s.local) if sk == nil { // may be fine for sk to be nil, just log a warning. log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") } - log.Debugf("Swarm Listening at %s", maddr) - list, err := conn.Listen(s.Context(), maddr, s.local, sk) + + list, err := conn.WrapManetListener(s.Context(), malist, s.local, sk) if err != nil { return err } @@ -77,6 +94,10 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { }) } + return s.addConnListener(list) +} + +func (s *Swarm) addConnListener(list conn.Listener) error { // AddListener to the peerstream Listener. this will begin accepting connections // and streams! sl, err := s.swarm.AddListener(list) @@ -85,6 +106,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { } log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) + maddr := list.Multiaddr() + // signal to our notifiees on successful conn. s.notifyAll(func(n inet.Notifiee) { n.Listen((*Network)(s), maddr)