From 1b975c0c693c36a445316e0d21d964f852471f5e Mon Sep 17 00:00:00 2001 From: Steven Allen <steven@stebalien.com> Date: Tue, 31 Mar 2020 12:08:27 -0700 Subject: [PATCH 1/2] feat: remove relay discovery and unspecified relay dialing This was a misfeature that looks great in demos but isn't useful in practice. First, random relays we discover are unlikely to support _active_ relaying. This means that picking a random relay is usually going to fail because the random relay won't already be connected to the target. Second, there are two use-cases for relaying: 1. Connecting to undialable nodes. 2. Relaying all outbound traffic. For the first case, we only want to use the relays specified by the target peers. For the second case, we'd need to modify either libp2p's dialer logic (in the swarm) to prefix all addresses with the specified relay. The logic _here_ covers neither use-case. --- dial.go | 46 ++++++-------------------------------- go.sum | 2 ++ notify.go | 54 -------------------------------------------- relay.go | 30 ++++++++++++------------- relay_test.go | 57 +++++++++++++---------------------------------- transport_test.go | 18 +++++---------- 6 files changed, 44 insertions(+), 163 deletions(-) delete mode 100644 notify.go diff --git a/dial.go b/dial.go index 1b6e9db..a81715a 100644 --- a/dial.go +++ b/dial.go @@ -3,7 +3,6 @@ package relay import ( "context" "fmt" - "math/rand" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" @@ -30,6 +29,13 @@ func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, err return nil, fmt.Errorf("%s is not a relay address", a) } + if relayaddr == nil { + return nil, fmt.Errorf( + "can't dial a p2p-circuit without specifying a relay: %s", + a, + ) + } + // Strip the /p2p-circuit prefix from the destaddr. _, destaddr = ma.SplitFirst(destaddr) @@ -38,11 +44,6 @@ func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, err dinfo.Addrs = append(dinfo.Addrs, destaddr) } - if relayaddr == nil { - // unspecific relay address, try dialing using known hop relays - return r.tryDialRelays(ctx, *dinfo) - } - var rinfo *peer.AddrInfo rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr) if err != nil { @@ -51,36 +52,3 @@ func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, err return r.DialPeer(ctx, *rinfo, *dinfo) } - -func (r *Relay) tryDialRelays(ctx context.Context, dinfo peer.AddrInfo) (*Conn, error) { - var relays []peer.ID - r.mx.Lock() - for p := range r.relays { - relays = append(relays, p) - } - r.mx.Unlock() - - // shuffle list of relays, avoid overloading a specific relay - for i := range relays { - j := rand.Intn(i + 1) - relays[i], relays[j] = relays[j], relays[i] - } - - for _, relay := range relays { - if len(r.host.Network().ConnsToPeer(relay)) == 0 { - continue - } - - rctx, cancel := context.WithTimeout(ctx, HopConnectTimeout) - c, err := r.DialPeer(rctx, peer.AddrInfo{ID: relay}, dinfo) - cancel() - - if err == nil { - return c, nil - } - - log.Debugf("error opening relay connection through %s: %s", dinfo.ID, err.Error()) - } - - return nil, fmt.Errorf("Failed to dial through %d known relay hosts", len(relays)) -} diff --git a/go.sum b/go.sum index 5799b92..53d7a07 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,7 @@ github.com/libp2p/go-eventbus v0.1.0/go.mod h1:vROgu5cs5T7cv7POWlWxBaVLxfSegC5UG github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= +github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-blankhost v0.1.4 h1:I96SWjR4rK9irDHcHq3XHN6hawCRTPUADzkJacgZLvk= github.com/libp2p/go-libp2p-blankhost v0.1.4/go.mod h1:oJF0saYsAXQCSfDq254GMNmLNz6ZTHTOvtF4ZydUvwU= @@ -287,6 +288,7 @@ go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= +go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/notify.go b/notify.go deleted file mode 100644 index 813daed..0000000 --- a/notify.go +++ /dev/null @@ -1,54 +0,0 @@ -package relay - -import ( - "context" - "time" - - inet "github.com/libp2p/go-libp2p-core/network" - peer "github.com/libp2p/go-libp2p-core/peer" - ma "github.com/multiformats/go-multiaddr" -) - -var _ inet.Notifiee = (*RelayNotifiee)(nil) - -type RelayNotifiee Relay - -func (r *Relay) notifiee() inet.Notifiee { - return (*RelayNotifiee)(r) -} - -func (n *RelayNotifiee) Relay() *Relay { - return (*Relay)(n) -} - -func (n *RelayNotifiee) Listen(net inet.Network, a ma.Multiaddr) {} -func (n *RelayNotifiee) ListenClose(net inet.Network, a ma.Multiaddr) {} -func (n *RelayNotifiee) OpenedStream(net inet.Network, s inet.Stream) {} -func (n *RelayNotifiee) ClosedStream(net inet.Network, s inet.Stream) {} - -func (n *RelayNotifiee) Connected(s inet.Network, c inet.Conn) { - if n.Relay().Matches(c.RemoteMultiaddr()) { - return - } - - go func(id peer.ID) { - ctx, cancel := context.WithTimeout(n.ctx, time.Second) - defer cancel() - - canhop, err := n.Relay().CanHop(ctx, id) - if err != nil { - log.Debugf("Error testing relay hop: %s", err.Error()) - return - } - - if canhop { - log.Debugf("Discovered hop relay %s", id.Pretty()) - n.mx.Lock() - n.relays[id] = struct{}{} - n.mx.Unlock() - n.host.ConnManager().TagPeer(id, "relay-hop", 2) - } - }(c.RemotePeer()) -} - -func (n *RelayNotifiee) Disconnected(s inet.Network, c inet.Conn) {} diff --git a/relay.go b/relay.go index fcb1b5e..1081623 100644 --- a/relay.go +++ b/relay.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync" "sync/atomic" "time" @@ -45,15 +44,11 @@ type Relay struct { ctx context.Context self peer.ID - active bool - hop bool - discovery bool + active bool + hop bool incoming chan *Conn - relays map[peer.ID]struct{} - mx sync.Mutex - // atomic counters streamCount int32 liveHopCount int32 @@ -72,9 +67,14 @@ var ( // this will only relay traffic between peers already connected to this // node. OptHop = RelayOpt(1) - // OptDiscovery configures this relay transport to discover new relays - // by probing every new peer. You almost _certainly_ don't want to - // enable this. + // OptDiscovery is a no-op. It was introduced as a way to probe new + // peers to see if they were willing to act as a relays. However, in + // practice, it's useless. While it does test to see if these peers are + // relays, it doesn't (and can't), check to see if these peers are + // _active_ relays (i.e., will actively dial the target peer). + // + // This option may be re-enabled in the future but for now you shouldn't + // use it. OptDiscovery = RelayOpt(2) ) @@ -94,7 +94,6 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. ctx: ctx, self: h.ID(), incoming: make(chan *Conn), - relays: make(map[peer.ID]struct{}), } for _, opt := range opts { @@ -104,7 +103,10 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. case OptHop: r.hop = true case OptDiscovery: - r.discovery = true + log.Errorf( + "circuit.OptDiscovery is now a no-op: %s", + "dialing peers with a random relay is no longer supported", + ) default: return nil, fmt.Errorf("unrecognized option: %d", opt) } @@ -112,10 +114,6 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. h.SetStreamHandler(ProtoID, r.handleNewStream) - if r.discovery { - h.Network().Notify(r.notifiee()) - } - return r, nil } diff --git a/relay_test.go b/relay_test.go index b23c2e0..220249d 100644 --- a/relay_test.go +++ b/relay_test.go @@ -69,7 +69,7 @@ func TestBasicRelay(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1], OptHop) @@ -143,7 +143,7 @@ func TestRelayReset(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1], OptHop) @@ -201,7 +201,7 @@ func TestBasicRelayDial(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) _ = newTestRelay(t, ctx, hosts[1], OptHop) r3 := newTestRelay(t, ctx, hosts[2]) @@ -263,13 +263,12 @@ func TestBasicRelayDial(t *testing.T) { } } -func TestUnspecificRelayDial(t *testing.T) { +func TestUnspecificRelayDialFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() hosts := getNetHosts(t, ctx, 3) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1], OptHop) @@ -281,36 +280,22 @@ func TestUnspecificRelayDial(t *testing.T) { time.Sleep(100 * time.Millisecond) var ( - conn1, conn2 net.Conn - done = make(chan struct{}) + done = make(chan struct{}) ) defer func() { + cancel() <-done - if conn1 != nil { - conn1.Close() - } - if conn2 != nil { - conn2.Close() - } }() - msg := []byte("relay works!") go func() { defer close(done) list := r3.Listener() var err error - conn1, err = list.Accept() - if err != nil { - t.Error(err) - return - } - - _, err = conn1.Write(msg) - if err != nil { - t.Error(err) - return + _, err = list.Accept() + if err == nil { + t.Error("should not have received relay connection") } }() @@ -320,19 +305,9 @@ func TestUnspecificRelayDial(t *testing.T) { defer rcancel() var err error - conn2, err = r1.Dial(rctx, addr, hosts[2].ID()) - if err != nil { - t.Fatal(err) - } - - data := make([]byte, len(msg)) - _, err = io.ReadFull(conn2, data) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) + _, err = r1.Dial(rctx, addr, hosts[2].ID()) + if err == nil { + t.Fatal("expected dial with unspecified relay address to fail, even if we're connected to a relay") } } @@ -347,7 +322,7 @@ func TestRelayThroughNonHop(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1]) @@ -384,7 +359,7 @@ func TestRelayNoDestConnection(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1], OptHop) @@ -419,7 +394,7 @@ func TestActiveRelay(t *testing.T) { time.Sleep(10 * time.Millisecond) - r1 := newTestRelay(t, ctx, hosts[0], OptDiscovery) + r1 := newTestRelay(t, ctx, hosts[0]) newTestRelay(t, ctx, hosts[1], OptHop, OptActive) diff --git a/transport_test.go b/transport_test.go index 960ffaf..2d81c6b 100644 --- a/transport_test.go +++ b/transport_test.go @@ -26,7 +26,7 @@ var msg = []byte("relay works!") func testSetupRelay(t *testing.T, ctx context.Context) []host.Host { hosts := getNetHosts(t, ctx, 3) - err := AddRelayTransport(ctx, hosts[0], swarmt.GenUpgrader(hosts[0].Network().(*swarm.Swarm)), OptDiscovery) + err := AddRelayTransport(ctx, hosts[0], swarmt.GenUpgrader(hosts[0].Network().(*swarm.Swarm))) if err != nil { t.Fatal(err) } @@ -121,7 +121,7 @@ func TestSpecificRelayTransportDial(t *testing.T) { } } -func TestUnspecificRelayTransportDial(t *testing.T) { +func TestUnspecificRelayTransportDialFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -137,17 +137,9 @@ func TestUnspecificRelayTransportDial(t *testing.T) { hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) - s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) - if err != nil { - t.Fatal(err) + _, err = hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) + if err == nil { + t.Fatal("dial to unspecified address should have failed") } - data, err := ioutil.ReadAll(s) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, msg) { - t.Fatal("message was incorrect:", string(data)) - } } From 7392efe6520ee3ec27bc659d796797f2af4822db Mon Sep 17 00:00:00 2001 From: Steven Allen <steven@stebalien.com> Date: Tue, 31 Mar 2020 12:18:52 -0700 Subject: [PATCH 2/2] feat: functional options Introduce functional options. This should make it easier for us to add additional options in the future (e.g., known relays, relay policies, etc.). --- relay.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/relay.go b/relay.go index 1081623..320533e 100644 --- a/relay.go +++ b/relay.go @@ -55,18 +55,24 @@ type Relay struct { } // RelayOpts are options for configuring the relay transport. -type RelayOpt int +type RelayOpt func(*Relay) error var ( // OptActive configures the relay transport to actively establish // outbound connections on behalf of clients. You probably don't want to // enable this unless you know what you're doing. - OptActive = RelayOpt(0) + OptActive RelayOpt = func(r *Relay) error { + r.active = true + return nil + } // OptHop configures the relay transport to accept requests to relay // traffic on behalf of third-parties. Unless OptActive is specified, // this will only relay traffic between peers already connected to this // node. - OptHop = RelayOpt(1) + OptHop = func(r *Relay) error { + r.hop = true + return nil + } // OptDiscovery is a no-op. It was introduced as a way to probe new // peers to see if they were willing to act as a relays. However, in // practice, it's useless. While it does test to see if these peers are @@ -75,7 +81,13 @@ var ( // // This option may be re-enabled in the future but for now you shouldn't // use it. - OptDiscovery = RelayOpt(2) + OptDiscovery = func(r *Relay) error { + log.Errorf( + "circuit.OptDiscovery is now a no-op: %s", + "dialing peers with a random relay is no longer supported", + ) + return nil + } ) type RelayError struct { @@ -97,18 +109,8 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. } for _, opt := range opts { - switch opt { - case OptActive: - r.active = true - case OptHop: - r.hop = true - case OptDiscovery: - log.Errorf( - "circuit.OptDiscovery is now a no-op: %s", - "dialing peers with a random relay is no longer supported", - ) - default: - return nil, fmt.Errorf("unrecognized option: %d", opt) + if err := opt(r); err != nil { + return nil, err } }