Skip to content

Commit

Permalink
webrtc: add a test for establishing many connections
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 9, 2024
1 parent 172e1d6 commit 628bf36
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 13 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,5 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

replace github.com/pion/ice/v2 => github.com/pion/ice/v2 v2.3.25-0.20240609174246-45043bdb58c3
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25-0.20240609174246-45043bdb58c3 h1:0GtcVIH0YUX41lBVsuTTLny5S1FTdcVdD7iXYh2Trb8=
github.com/pion/ice/v2 v2.3.25-0.20240609174246-45043bdb58c3/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
18 changes: 11 additions & 7 deletions p2p/transport/webrtc/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *connMultiaddrs) LocalMultiaddr() ma.Multiaddr { return c.local }
func (c *connMultiaddrs) RemoteMultiaddr() ma.Multiaddr { return c.remote }

const (
candidateSetupTimeout = 20 * time.Second
candidateSetupTimeout = 60 * time.Second
DefaultMaxInFlightConnections = 10
)

Expand Down Expand Up @@ -128,11 +128,12 @@ func (l *listener) listen() {

ctx, cancel := context.WithTimeout(l.ctx, candidateSetupTimeout)
defer cancel()

fmt.Println("received UFrag", candidate.Ufrag, candidate.Addr)
conn, err := l.handleCandidate(ctx, candidate)
if err != nil {
l.mux.RemoveConnByUfrag(candidate.Ufrag)
log.Debugf("could not accept connection: %s: %v", candidate.Ufrag, err)
log.Errorf("could not accept connection: %s: %v", candidate.Ufrag, err)
fmt.Printf("could not accept connection: %s: %v\n", candidate.Ufrag, err)
return
}

Expand Down Expand Up @@ -219,7 +220,7 @@ func (l *listener) setupConnection(
return nil, fmt.Errorf("instantiating peer connection failed: %w", err)
}

errC := addOnConnectionStateChangeCallback(w.PeerConnection)
errC := addOnConnectionStateChangeCallback(w.PeerConnection, "listener")
// Infer the client SDP from the incoming STUN message by setting the ice-ufrag.
if err := w.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{
SDP: createClientSDP(candidate.Addr, candidate.Ufrag),
Expand Down Expand Up @@ -326,24 +327,27 @@ func (l *listener) Multiaddr() ma.Multiaddr {
// * is closed when the state changes to Connection
// * receives an error when the state changes to Failed
// * doesn't receive anything (nor is closed) when the state changes to Disconnected
func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error {
func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection, side string) <-chan error {
errC := make(chan error, 1)
var once sync.Once
st := time.Now()
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
fmt.Println("state received: ", side, state, pc.ConnectionState(), time.Since(st))
switch pc.ConnectionState() {
case webrtc.PeerConnectionStateConnected:
once.Do(func() { close(errC) })
case webrtc.PeerConnectionStateFailed:
once.Do(func() {
errC <- errors.New("peerconnection failed")
close(errC)
})
log.Error("peer connection failed")
case webrtc.PeerConnectionStateDisconnected:
// the connection can move to a disconnected state and back to a connected state without ICE renegotiation.
// This could happen when underlying UDP packets are lost, and therefore the connection moves to the disconnected state.
// If the connection then receives packets on the connection, it can move back to the connected state.
// If no packets are received until the failed timeout is triggered, the connection moves to the failed state.
log.Warn("peerconnection disconnected")
log.Error("peerconnection disconnected")
}
})
return errC
Expand Down
8 changes: 4 additions & 4 deletions p2p/transport/webrtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ const (
// timeout values for the peerconnection
// https://github.com/pion/webrtc/blob/v3.1.50/settingengine.go#L102-L109
const (
DefaultDisconnectedTimeout = 20 * time.Second
DefaultFailedTimeout = 30 * time.Second
DefaultKeepaliveTimeout = 15 * time.Second
DefaultDisconnectedTimeout = 60 * time.Second
DefaultFailedTimeout = 60 * time.Second
DefaultKeepaliveTimeout = 60 * time.Second

sctpReceiveBufferSize = 100_000
)
Expand Down Expand Up @@ -327,7 +327,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement
return nil, fmt.Errorf("instantiating peer connection failed: %w", err)
}

errC := addOnConnectionStateChangeCallback(w.PeerConnection)
errC := addOnConnectionStateChangeCallback(w.PeerConnection, "dialer")

// do offer-answer exchange
offer, err := w.PeerConnection.CreateOffer(nil)
Expand Down
93 changes: 93 additions & 0 deletions p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand All @@ -17,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
tpt "github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multibase"
Expand Down Expand Up @@ -860,3 +863,93 @@ func TestMaxInFlightRequests(t *testing.T) {
require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes")
require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure")
}

func TestManyConnections(t *testing.T) {
const N = 200
errCh := make(chan error, 200)
successCh := make(chan struct{}, 1)

tr, lp := getTransport(t)
ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct"))
require.NoError(t, err)
defer ln.Close()

runListenConn := func(conn tpt.CapableConn) {
defer conn.Close()

s, err := conn.AcceptStream()
if err != nil {
t.Errorf("accept stream failed for listener: %s", err)
errCh <- err
return
}
var b [4]byte
if _, err := s.Read(b[:]); err != nil {
t.Errorf("read stream failed for listener: %s", err)
errCh <- err
return
}
s.Write(b[:])
_, err = s.Read(b[:]) // peer will close the connection after read
if !assert.Error(t, err) {
errCh <- errors.New("expected peer to close connection")
return
}
}

runDialConn := func(conn tpt.CapableConn) error {
defer conn.Close()

s, err := conn.OpenStream(context.Background())
if err != nil {
t.Errorf("accept stream failed for listener: %s", err)
return err
}
var b [4]byte
if _, err := s.Write(b[:]); err != nil {
t.Errorf("write stream failed for dialer: %s", err)
return err
}
if _, err := s.Read(b[:]); err != nil {
t.Errorf("read stream failed for dialer: %s", err)
return err
}
return nil
}

go func() {
for i := 0; i < N; i++ {
conn, err := ln.Accept()
if err != nil {
t.Errorf("listener failed to accept conneciton: %s %d", err, runtime.NumGoroutine())
return
}
runListenConn(conn)
successCh <- struct{}{}
}
}()

tp, _ := getTransport(t)
for i := 0; i < N; i++ {
// This test aims to check for deadlocks. So keep a high timeout
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
conn, err := tp.Dial(ctx, ln.Multiaddr(), lp)
if err != nil {
t.Errorf("dial failed: %s %d", err, runtime.NumGoroutine())
cancel()
return
}
err = runDialConn(conn)
require.NoError(t, err)
cancel()
select {
case <-time.After(120 * time.Second):
t.Fatalf("timed out %d", runtime.NumGoroutine())
case <-errCh:
t.Fatal("listener error:", err, runtime.NumGoroutine())
case <-successCh:
}
t.Log("completed conn:", i, runtime.NumGoroutine())
}

}

0 comments on commit 628bf36

Please sign in to comment.