Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: WebRTC reuse QUIC conn #2889

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
Expand Down Expand Up @@ -35,10 +36,12 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/prometheus/client_golang/prometheus"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/quic-go/quic-go"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
Expand Down Expand Up @@ -284,6 +287,29 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
quicAddrPorts := map[string]struct{}{}
for _, addr := range sw.ListenAddresses() {
if _, err := addr.ValueForProtocol(ma.P_QUIC_V1); err == nil {
netw, addr, err := manet.DialArgs(addr)
if err != nil {
return false
}
quicAddrPorts[netw+"_"+addr] = struct{}{}
}
}
_, ok := quicAddrPorts[network+"_"+laddr.String()]
return ok
}

return func(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
if hasQuicAddrPortFor(network, laddr) {
return cm.SharedNonQUICPacketConn(network, laddr)
}
return net.ListenUDP(network, laddr)
}
}),
}
fxopts = append(fxopts, cfg.Transports...)
if cfg.Insecure {
Expand Down
86 changes: 86 additions & 0 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -24,7 +26,9 @@ import (
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"go.uber.org/goleak"

Expand Down Expand Up @@ -416,6 +420,7 @@ func TestMain(m *testing.M) {
m,
// This will return eventually (5s timeout) but doesn't take a context.
goleak.IgnoreAnyFunction("github.com/koron/go-ssdp.Search"),
goleak.IgnoreAnyFunction("github.com/pion/sctp.(*Stream).SetReadDeadline.func1"),
// Logging & Stats
goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
Expand Down Expand Up @@ -490,3 +495,84 @@ func TestHostAddrsFactoryAddsCerthashes(t *testing.T) {
}, 5*time.Second, 50*time.Millisecond)
h.Close()
}

func TestWebRTCReuseAddrWithQUIC(t *testing.T) {
order := [][]string{
{"/ip4/127.0.0.1/udp/54322/quic-v1", "/ip4/127.0.0.1/udp/54322/webrtc-direct"},
{"/ip4/127.0.0.1/udp/54322/webrtc-direct", "/ip4/127.0.0.1/udp/54322/quic-v1"},
// We do not support WebRTC automatically reusing QUIC addresses if port is not specified, yet.
// {"/ip4/127.0.0.1/udp/0/webrtc-direct", "/ip4/127.0.0.1/udp/0/quic-v1"},
}
for i, addrs := range order {
t.Run("Order "+strconv.Itoa(i), func(t *testing.T) {
h1, err := New(ListenAddrStrings(addrs...), Transport(quic.NewTransport), Transport(libp2pwebrtc.New))
require.NoError(t, err)
defer h1.Close()

seenPorts := make(map[string]struct{})
for _, addr := range h1.Addrs() {
s, err := addr.ValueForProtocol(ma.P_UDP)
require.NoError(t, err)
seenPorts[s] = struct{}{}
}
require.Len(t, seenPorts, 1)

quicClient, err := New(NoListenAddrs, Transport(quic.NewTransport))
require.NoError(t, err)
defer quicClient.Close()

webrtcClient, err := New(NoListenAddrs, Transport(libp2pwebrtc.New))
require.NoError(t, err)
defer webrtcClient.Close()

for _, client := range []host.Host{quicClient, webrtcClient} {
err := client.Connect(context.Background(), peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()})
require.NoError(t, err)
}

t.Run("quic client can connect", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := ping.NewPingService(quicClient)
resCh := p.Ping(ctx, h1.ID())
res := <-resCh
require.NoError(t, res.Error)
})

t.Run("webrtc client can connect", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := ping.NewPingService(webrtcClient)
resCh := p.Ping(ctx, h1.ID())
res := <-resCh
require.NoError(t, res.Error)
})
})
}

swapPort := func(addrStrs []string, newPort string) []string {
out := make([]string, 0, len(addrStrs))
for _, addrStr := range addrStrs {
out = append(out, strings.Replace(addrStr, "54322", newPort, 1))
}
return out
}

t.Run("setup with no reuseport. Should fail", func(t *testing.T) {
h1, err := New(ListenAddrStrings(swapPort(order[0], "54323")...), Transport(quic.NewTransport), Transport(libp2pwebrtc.New), QUICReuse(quicreuse.NewConnManager, quicreuse.DisableReuseport()))
require.NoError(t, err) // It's a bug/feature that swarm.Listen does not error if at least one transport succeeds in listening.
defer h1.Close()
// Check that webrtc did fail to listen
require.Equal(t, 1, len(h1.Addrs()))
require.Contains(t, h1.Addrs()[0].String(), "quic-v1")
})

t.Run("setup with autonat", func(t *testing.T) {
h1, err := New(EnableAutoNATv2(), ListenAddrStrings(swapPort(order[0], "54324")...), Transport(quic.NewTransport), Transport(libp2pwebrtc.New), QUICReuse(quicreuse.NewConnManager, quicreuse.DisableReuseport()))
require.NoError(t, err) // It's a bug/feature that swarm.Listen does not error if at least one transport succeeds in listening.
defer h1.Close()
// Check that webrtc did fail to listen
require.Equal(t, 1, len(h1.Addrs()))
require.Contains(t, h1.Addrs()[0].String(), "quic-v1")
})
}
40 changes: 36 additions & 4 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package swarm
import (
"errors"
"fmt"
"slices"
"time"

"github.com/libp2p/go-libp2p/core/canonicallog"
Expand All @@ -12,13 +13,44 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

type OrderedListener interface {
// Transports optionally implement this interface to indicate the relative
// ordering that listeners should be setup. Some transports may optionally
// make use of other listeners if they are setup. e.g. WebRTC may reuse the
// same UDP port as QUIC, but only when QUIC is setup first.
// lower values are setup first.
ListenOrder() int
}

MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
// Listen sets up listeners for all of the given addresses.
// It returns as long as we successfully listen on at least *one* address.
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
for i, a := range addrs {
if err := s.AddListenAddr(a); err != nil {

type addrAndListener struct {
addr ma.Multiaddr
lTpt transport.Transport
}
sortedAddrsAndTpts := make([]addrAndListener, 0, len(addrs))
for _, a := range addrs {
t := s.TransportForListening(a)
sortedAddrsAndTpts = append(sortedAddrsAndTpts, addrAndListener{addr: a, lTpt: t})
}
slices.SortFunc(sortedAddrsAndTpts, func(a, b addrAndListener) int {
aOrder := 0
bOrder := 0
if l, ok := a.lTpt.(OrderedListener); ok {
aOrder = l.ListenOrder()
}
if l, ok := b.lTpt.(OrderedListener); ok {
bOrder = l.ListenOrder()
}
return aOrder - bOrder
})

for i, a := range sortedAddrsAndTpts {
if err := s.AddListenAddr(a.addr); err != nil {
errs[i] = err
} else {
succeeded++
Expand All @@ -27,11 +59,11 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {

for i, e := range errs {
if e != nil {
log.Warnw("listening failed", "on", addrs[i], "error", errs[i])
log.Warnw("listening failed", "on", sortedAddrsAndTpts[i].addr, "error", errs[i])
}
}

if succeeded == 0 && len(addrs) > 0 {
if succeeded == 0 && len(sortedAddrsAndTpts) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}

Expand Down
6 changes: 6 additions & 0 deletions p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/quic-go/quic-go"
)

const ListenOrder = 1

var log = logging.Logger("quic-transport")

var ErrHolePunching = errors.New("hole punching attempted; no active dial")
Expand Down Expand Up @@ -103,6 +105,10 @@ func NewTransport(key ic.PrivKey, connManager *quicreuse.ConnManager, psk pnet.P
}, nil
}

func (t *transport) ListenOrder() int {
return ListenOrder
}

// Dial dials a new QUIC connection
func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (_c tpt.CapableConn, _err error) {
if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient {
Expand Down
22 changes: 22 additions & 0 deletions p2p/transport/quicreuse/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ func (c *ConnManager) onListenerClosed(key string) {
}
}

func (c *ConnManager) SharedNonQUICPacketConn(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
c.quicListenersMu.Lock()
defer c.quicListenersMu.Unlock()
key := laddr.String()
entry, ok := c.quicListeners[key]
if !ok {
return nil, errors.New("expected to be able to share with a QUIC listener, but no QUIC listener found. The QUIC listener should start first")
}
t := entry.ln.transport
if t, ok := t.(*refcountedTransport); ok {
t.IncreaseCount()
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
return &nonQUICPacketConn{
ctx: ctx,
ctxCancel: cancel,
owningTransport: t,
tr: &t.Transport,
}, nil
}
return nil, errors.New("expected to be able to share with a QUIC listener, but the QUIC listener is not using a refcountedTransport. `DisableReuseport` should not be set")
}

func (c *ConnManager) transportForListen(network string, laddr *net.UDPAddr) (refCountedQuicTransport, error) {
if c.enableReuseport {
reuse, err := c.getReuse(network)
Expand Down
74 changes: 74 additions & 0 deletions p2p/transport/quicreuse/nonquic_packetconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package quicreuse

import (
"context"
"net"
"time"

"github.com/quic-go/quic-go"
)

// nonQUICPacketConn is a net.PacketConn that can be used to read and write
// non-QUIC packets on a quic.Transport. This lets us reuse this UDP port for
// other transports like WebRTC.
type nonQUICPacketConn struct {
owningTransport refCountedQuicTransport
tr *quic.Transport
ctx context.Context
ctxCancel context.CancelFunc
readCtx context.Context
readCancel context.CancelFunc
}

// Close implements net.PacketConn.
func (n *nonQUICPacketConn) Close() error {
n.ctxCancel()

// Don't actually close the underlying transport since someone else might be using it.
// reuse has it's own gc to close unused transports.
n.owningTransport.DecreaseCount()
return nil
}

// LocalAddr implements net.PacketConn.
func (n *nonQUICPacketConn) LocalAddr() net.Addr {
return n.tr.Conn.LocalAddr()
}

// ReadFrom implements net.PacketConn.
func (n *nonQUICPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
ctx := n.readCtx
if ctx == nil {
ctx = n.ctx
}
return n.tr.ReadNonQUICPacket(ctx, p)
}

// SetDeadline implements net.PacketConn.
func (n *nonQUICPacketConn) SetDeadline(t time.Time) error {
// Only used for reads.
return n.SetReadDeadline(t)
}

// SetReadDeadline implements net.PacketConn.
func (n *nonQUICPacketConn) SetReadDeadline(t time.Time) error {
if t.IsZero() && n.readCtx != nil {
n.readCancel()
n.readCtx = nil
}
n.readCtx, n.readCancel = context.WithDeadline(n.ctx, t)
return nil
}
sukunrt marked this conversation as resolved.
Show resolved Hide resolved

// SetWriteDeadline implements net.PacketConn.
func (n *nonQUICPacketConn) SetWriteDeadline(t time.Time) error {
// Unused. quic-go doesn't support deadlines for writes.
return nil
}

// WriteTo implements net.PacketConn.
func (n *nonQUICPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
return n.tr.WriteTo(p, addr)
}

var _ net.PacketConn = &nonQUICPacketConn{}
Loading
Loading