From 12ead095c42819d22b1542b071b357b2c1de0c25 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 7 Aug 2023 16:38:44 +0530 Subject: [PATCH] swarm: move back off handling outside worker loop --- p2p/net/swarm/dial_worker.go | 21 +++-------------- p2p/net/swarm/swarm_dial.go | 34 ++++++++++++---------------- p2p/net/swarm/swarm_dial_test.go | 39 +++++++++++++++++++++++++++++++- 3 files changed, 55 insertions(+), 39 deletions(-) diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index d6e9ef77f3..0bbf9c4d36 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -288,16 +288,9 @@ loop: } ad.dialed = true ad.dialRankingDelay = now.Sub(ad.createdAt) - err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) - if err != nil { - // Errored without attempting a dial. This happens in case of - // backoff or black hole. - w.dispatchError(ad, err) - } else { - // the dial was successful. update inflight dials - dialsInFlight++ - totalDials++ - } + w.s.limitedDial(ad.ctx, w.peer, ad.addr, w.resch) + dialsInFlight++ + totalDials++ } timerRunning = false // schedule more dials @@ -389,14 +382,6 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) { } } } - - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests. - // this is necessary to support active listen scenarios, where a new dial comes in while - // another dial is in progress, and needs to do a direct connection without inhibitions from - // dial backoff. - if err == ErrDialBackoff { - delete(w.trackedDials, string(ad.addr.Bytes())) - } } // rankAddrs ranks addresses for dialing. if it's a simConnect request we diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index e17c27353b..5f4fc0a552 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -311,12 +311,10 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) (goodAddrs []ma.Mul return nil, nil, err } - goodAddrs = ma.Unique(resolved) - goodAddrs, addrErrs = s.filterKnownUndialables(p, goodAddrs) - if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect { - goodAddrs = ma.FilterAddrs(goodAddrs, s.nonProxyAddr) - } + forceDirect, _ := network.GetForceDirectDial(ctx) + goodAddrs = ma.Unique(resolved) + goodAddrs, addrErrs = s.filterKnownUndialables(p, goodAddrs, forceDirect) if len(goodAddrs) == 0 { return nil, addrErrs, ErrNoGoodAddresses } @@ -388,20 +386,6 @@ func (s *Swarm) resolveAddrs(ctx context.Context, pi peer.AddrInfo) ([]ma.Multia return resolved, nil } -func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { - // check the dial backoff - if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { - if s.backf.Backoff(p, addr) { - return ErrDialBackoff - } - } - - // start the dial - s.limitedDial(ctx, p, addr, resch) - - return nil -} - func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { t := s.TransportForDialing(addr) return !t.Proxy() @@ -413,7 +397,7 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { // addresses that we know to be our own, and addresses with a better tranport // available. This is an optimization to avoid wasting time on dials that we // know are going to fail or for which we have a better alternative. -func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) (goodAddrs []ma.Multiaddr, addrErrs []TransportError) { +func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr, forceDirect bool) (goodAddrs []ma.Multiaddr, addrErrs []TransportError) { lisAddrs, _ := s.InterfaceListenAddresses() var ourAddrs []ma.Multiaddr for _, addr := range lisAddrs { @@ -468,6 +452,16 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) (goodAdd } return true }, + func(addr ma.Multiaddr) bool { + if !forceDirect && s.backf.Backoff(p, addr) { + addrErrs = append(addrErrs, TransportError{Address: addr, Cause: ErrDialBackoff}) + return false + } + return true + }, + func(addr ma.Multiaddr) bool { + return !forceDirect || s.nonProxyAddr(addr) + }, ), addrErrs } diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index f9d90b8c44..acfaa40c40 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -370,7 +370,44 @@ func TestBlackHoledAddrBlocked(t *testing.T) { if conn != nil { t.Fatalf("expected dial to be blocked") } - if !errors.Is(err, ErrNoGoodAddresses) { + var de *DialError + if !errors.As(err, &de) { t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err) } + require.Contains(t, de.DialErrors, TransportError{Address: addr, Cause: ErrDialRefusedBlackHole}) +} + +func TestBackoffAddrBlocked(t *testing.T) { + resolver, err := madns.NewResolver() + if err != nil { + t.Fatal(err) + } + s := newTestSwarmWithResolver(t, resolver) + defer s.Close() + + // all dials to the address will fail. RFC6666 Discard Prefix + addr := ma.StringCast("/ip6/0100::1/tcp/54321/") + p, err := test.RandPeerID() + if err != nil { + t.Error(err) + } + s.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) + + // do 1 extra dial to ensure that the blackHoleDetector state is updated since it + // happens in a different goroutine + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + conn, err := s.DialPeer(ctx, p) + require.Nil(t, conn) + require.Error(t, err) + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + conn, err = s.DialPeer(ctx, p) + require.Nil(t, conn) + var de *DialError + if !errors.As(err, &de) { + t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err) + } + require.Contains(t, de.DialErrors, TransportError{Address: addr, Cause: ErrDialBackoff}) }