Skip to content

Commit

Permalink
refactor creation of dialers and listeners in swarm
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Oct 5, 2015
1 parent 128624a commit 430b9df
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 143 deletions.
199 changes: 113 additions & 86 deletions p2p/net/conn/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"fmt"
"math/rand"
"net"
"strings"
"syscall"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp"
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
Expand Down Expand Up @@ -95,98 +95,28 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
return connOut, nil
}

// rawConnDial dials the underlying net.Conn + manet.Conns
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {

// before doing anything, check we're going to be able to dial.
// we may not support the given address.
if _, _, err := manet.DialArgs(raddr); err != nil {
return nil, err
}

if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
}

// get local addr to use.
laddr := pickLocalAddr(d.LocalAddrs, raddr)
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()

// make a copy of the manet.Dialer, we may need to change its timeout.
madialer := d.Dialer

if laddr != nil && isTcpMultiaddr(raddr) && reuseportIsAvailable() {
// we're perhaps going to dial twice. half the timeout, so we can afford to.
// otherwise our context would expire right after the first dial.
madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2)

// dial using reuseport.Dialer, because we're probably reusing addrs.
// this is optimistic, as the reuseDial may fail to bind the port.
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
// if it worked, wrap the raw net.Conn with our manet.Conn
logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(nconn)
} else if !retry {
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
logdial["reuseport"] = "failure"
logdial["error"] = reuseErr
rpev.Done()
return nil, reuseErr
} else {
// this is a failure to reuse port. log it.
logdial["reuseport"] = "retry"
logdial["error"] = reuseErr
rpev.Done()
}
}
func (d *Dialer) AddDialer(pd ProtoDialer) {
d.Dialers = append(d.Dialers, pd)
}

useLocalAddr := true
for _, p := range raddr.Protocols() {
if p.Name == "utp" {
useLocalAddr = false
// returns dialer that can dial the given address
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) ProtoDialer {
for _, pd := range d.Dialers {
if pd.Matches(raddr) {
return pd
}
}
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
if !useLocalAddr {
madialer.LocalAddr = nil
}
return madialer.Dial(raddr)
}

func isTcpMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
return nil
}

func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) {
if laddr == nil {
// if we're given no local address no sense in using reuseport to dial, dial out as usual.
return nil, true, reuseport.ErrReuseFailed
}

// give reuse.Dialer the manet.Dialer's Dialer.
// (wow, Dialer should've so been an interface...)
rd := reuseport.Dialer{dialer}

// get the local net.Addr manually
rd.D.LocalAddr, err = manet.ToNetAddr(laddr)
if err != nil {
return nil, true, err // something wrong with laddr. retry without.
}

// get the raddr dial args for rd.dial
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, true, err // something wrong with laddr. retry without.
// rawConnDial dials the underlying net.Conn + manet.Conns
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {
sd := d.subDialerForAddr(raddr)
if sd == nil {
return nil, fmt.Errorf("no dialer for %s", raddr)
}

// rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set.
conn, err = rd.Dial(network, netraddr)
return conn, reuseErrShouldRetry(err), err // hey! it worked!
return sd.Dial(raddr)
}

// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
Expand Down Expand Up @@ -279,3 +209,100 @@ func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr {
}
return nil
}

type ProtoDialer interface {
Dial(raddr ma.Multiaddr) (manet.Conn, error)
Matches(ma.Multiaddr) bool
}

type TcpReuseDialer struct {
laddr ma.Multiaddr
rd reuseport.Dialer
madialer manet.Dialer
}

func NewTcpReuseDialer(base manet.Dialer, laddr ma.Multiaddr) (*TcpReuseDialer, error) {
rd := reuseport.Dialer{base.Dialer}

// get the local net.Addr manually
la, err := manet.ToNetAddr(laddr)
if err != nil {
return nil, err // something wrong with laddr.
}

rd.D.LocalAddr = la

return &TcpReuseDialer{
laddr: laddr,
rd: rd,
madialer: base,
}, nil
}

func (d *TcpReuseDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}

conn, err := d.rd.Dial(network, netraddr)
if err == nil {
return manet.WrapNetConn(conn)
}
if !reuseErrShouldRetry(err) {
return nil, err
}

return d.madialer.Dial(raddr)
}

func (d *TcpReuseDialer) Matches(a ma.Multiaddr) bool {
return IsTcpMultiaddr(a)
}

func IsTcpMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
}

func IsUtpMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 3 && p[2].Name == "utp"
}

type UtpReuseDialer struct {
d *mautp.Dialer
}

func NewUtpReuseDialer(d *mautp.Dialer) *UtpReuseDialer {
return &UtpReuseDialer{d}
}

func (d *UtpReuseDialer) Matches(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 3 && p[2].Name == "utp"
}

func (d *UtpReuseDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}

c, err := d.d.Dial(network, netraddr)
if err != nil {
return nil, err
}

return manet.WrapNetConn(c)
}

type BasicMaDialer struct{}

func (d *BasicMaDialer) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
return manet.Dial(raddr)
}

func (d *BasicMaDialer) Matches(a ma.Multiaddr) bool {
return true
}
5 changes: 4 additions & 1 deletion p2p/net/conn/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(new(BasicMaDialer))

var c2 Conn

Expand Down Expand Up @@ -152,6 +153,7 @@ func testDialer(t *testing.T, secure bool) {
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(new(BasicMaDialer))

go echoListen(ctx, l1)

Expand Down Expand Up @@ -227,6 +229,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
LocalPeer: p2.ID,
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
}
d2.AddDialer(new(BasicMaDialer))

errs := make(chan error, 100)
done := make(chan struct{}, 1)
Expand All @@ -253,7 +256,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {

c, err := d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
errs <- err
t.Fatal(err)
}
c.Close() // close it early.

Expand Down
8 changes: 4 additions & 4 deletions p2p/net/conn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ type Conn interface {
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {

// Dialer is an optional manet.Dialer to use.
Dialer manet.Dialer

// LocalPeer is the identity of the local Peer.
LocalPeer peer.ID

// LocalAddrs is a set of local addresses to use.
LocalAddrs []ma.Multiaddr

// Dialers are the sub-dialers usable by this dialer
// selected in order based on the address being dialed
Dialers []ProtoDialer

// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
PrivateKey ic.PrivKey
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/conn/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
return nil, err
}

return WrapManetListener(ctx, ml, local, sk)
}

func WrapManetListener(ctx context.Context, ml manet.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
l := &listener{
Listener: ml,
local: local,
Expand Down
35 changes: 16 additions & 19 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package swarm

import (
"fmt"
"net"
"sync"
"time"

Expand Down Expand Up @@ -79,32 +78,18 @@ type Swarm struct {
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.ID, peers peer.Peerstore, bwc metrics.Reporter) (*Swarm, error) {

log.Error("NEW SWARM: ", listenAddrs)
listenAddrs, err := filterAddrs(listenAddrs)
if err != nil {
return nil, err
}

// open connection to peer
d := &conn.Dialer{
Dialer: manet.Dialer{
Dialer: net.Dialer{
Timeout: DialTimeout,
},
},
LocalPeer: local,
LocalAddrs: listenAddrs,
PrivateKey: peers.PrivKey(local),
Wrapper: func(c manet.Conn) manet.Conn {
return mconn.WrapConn(bwc, c)
},
}
log.Error("listen addrs after: ", listenAddrs)

s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
ctx: ctx,
dialer: d,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
Expand All @@ -119,7 +104,19 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
prom.MustRegisterOrGet(peersTotal)
s.Notify((*metricsNotifiee)(s))

return s, s.listen(listenAddrs)
// open connection to peer
d := &conn.Dialer{
LocalPeer: local,
LocalAddrs: listenAddrs,
PrivateKey: peers.PrivKey(local),
Wrapper: func(c manet.Conn) manet.Conn {
return mconn.WrapConn(bwc, c)
},
}

s.dialer = d

return s, s.setupAddresses(listenAddrs)
}

func (s *Swarm) teardown() error {
Expand Down Expand Up @@ -152,7 +149,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
return err
}

return s.listen(addrs)
return s.setupAddresses(addrs)
}

// Process returns the Process of the swarm
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ func TestFilterAddrs(t *testing.T) {
}

bad := []ma.Multiaddr{
m("/ip4/1.2.3.4/udp/1234"), // unreliable
//m("/ip4/1.2.3.4/udp/1234"), // unreliable, fails due to utp working
m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
m("/ip6/fe80::1/tcp/0"), // link local
m("/ip6/fe80::100/tcp/1234"), // link local
Expand All @@ -34,6 +33,7 @@ func TestFilterAddrs(t *testing.T) {
good := []ma.Multiaddr{
m("/ip4/127.0.0.1/tcp/0"),
m("/ip6/::1/tcp/0"),
m("/ip4/1.2.3.4/udp/1234/utp"),
}

goodAndBad := append(good, bad...)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))

log.Errorf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
if len(remoteAddrs) == 0 {
err := errors.New("peer has no addresses")
logdial["error"] = err
Expand Down
Loading

0 comments on commit 430b9df

Please sign in to comment.