Skip to content

Commit

Permalink
swarm: move back off handling outside worker loop
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Aug 7, 2023
1 parent ccd767f commit 12ead09
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 39 deletions.
21 changes: 3 additions & 18 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,9 @@ loop:
}
ad.dialed = true
ad.dialRankingDelay = now.Sub(ad.createdAt)
err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
if err != nil {
// Errored without attempting a dial. This happens in case of
// backoff or black hole.
w.dispatchError(ad, err)
} else {
// the dial was successful. update inflight dials
dialsInFlight++
totalDials++
}
w.s.limitedDial(ad.ctx, w.peer, ad.addr, w.resch)
dialsInFlight++
totalDials++
}
timerRunning = false
// schedule more dials
Expand Down Expand Up @@ -389,14 +382,6 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
}
}
}

// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
// this is necessary to support active listen scenarios, where a new dial comes in while
// another dial is in progress, and needs to do a direct connection without inhibitions from
// dial backoff.
if err == ErrDialBackoff {
delete(w.trackedDials, string(ad.addr.Bytes()))
}
}

// rankAddrs ranks addresses for dialing. if it's a simConnect request we
Expand Down
34 changes: 14 additions & 20 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,10 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) (goodAddrs []ma.Mul
return nil, nil, err
}

goodAddrs = ma.Unique(resolved)
goodAddrs, addrErrs = s.filterKnownUndialables(p, goodAddrs)
if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect {
goodAddrs = ma.FilterAddrs(goodAddrs, s.nonProxyAddr)
}
forceDirect, _ := network.GetForceDirectDial(ctx)

goodAddrs = ma.Unique(resolved)
goodAddrs, addrErrs = s.filterKnownUndialables(p, goodAddrs, forceDirect)
if len(goodAddrs) == 0 {
return nil, addrErrs, ErrNoGoodAddresses
}
Expand Down Expand Up @@ -388,20 +386,6 @@ func (s *Swarm) resolveAddrs(ctx context.Context, pi peer.AddrInfo) ([]ma.Multia
return resolved, nil
}

func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error {
// check the dial backoff
if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect {
if s.backf.Backoff(p, addr) {
return ErrDialBackoff
}
}

// start the dial
s.limitedDial(ctx, p, addr, resch)

return nil
}

func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
Expand All @@ -413,7 +397,7 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
// addresses that we know to be our own, and addresses with a better tranport
// available. This is an optimization to avoid wasting time on dials that we
// know are going to fail or for which we have a better alternative.
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) (goodAddrs []ma.Multiaddr, addrErrs []TransportError) {
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr, forceDirect bool) (goodAddrs []ma.Multiaddr, addrErrs []TransportError) {
lisAddrs, _ := s.InterfaceListenAddresses()
var ourAddrs []ma.Multiaddr
for _, addr := range lisAddrs {
Expand Down Expand Up @@ -468,6 +452,16 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) (goodAdd
}
return true
},
func(addr ma.Multiaddr) bool {
if !forceDirect && s.backf.Backoff(p, addr) {
addrErrs = append(addrErrs, TransportError{Address: addr, Cause: ErrDialBackoff})
return false
}
return true
},
func(addr ma.Multiaddr) bool {
return !forceDirect || s.nonProxyAddr(addr)
},
), addrErrs
}

Expand Down
39 changes: 38 additions & 1 deletion p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,44 @@ func TestBlackHoledAddrBlocked(t *testing.T) {
if conn != nil {
t.Fatalf("expected dial to be blocked")
}
if !errors.Is(err, ErrNoGoodAddresses) {
var de *DialError
if !errors.As(err, &de) {
t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err)
}
require.Contains(t, de.DialErrors, TransportError{Address: addr, Cause: ErrDialRefusedBlackHole})
}

func TestBackoffAddrBlocked(t *testing.T) {
resolver, err := madns.NewResolver()
if err != nil {
t.Fatal(err)
}
s := newTestSwarmWithResolver(t, resolver)
defer s.Close()

// all dials to the address will fail. RFC6666 Discard Prefix
addr := ma.StringCast("/ip6/0100::1/tcp/54321/")
p, err := test.RandPeerID()
if err != nil {
t.Error(err)
}
s.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL)

// do 1 extra dial to ensure that the blackHoleDetector state is updated since it
// happens in a different goroutine
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
conn, err := s.DialPeer(ctx, p)
require.Nil(t, conn)
require.Error(t, err)
cancel()

ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
conn, err = s.DialPeer(ctx, p)
require.Nil(t, conn)
var de *DialError
if !errors.As(err, &de) {
t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err)
}
require.Contains(t, de.DialErrors, TransportError{Address: addr, Cause: ErrDialBackoff})
}

0 comments on commit 12ead09

Please sign in to comment.