Skip to content

Commit cb1dd0e

Browse files
committedFeb 23, 2024
webrtc: close mux when closing listener
There is currently a leak in the webrtc listener. When the listener is closed the udp mux readloop just keeps running.
1 parent c197ef4 commit cb1dd0e

File tree

4 files changed

+94
-32
lines changed

4 files changed

+94
-32
lines changed
 

‎p2p/transport/webrtc/listener.go

+28-19
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type listener struct {
5757
// used to control the lifecycle of the listener
5858
ctx context.Context
5959
cancel context.CancelFunc
60+
wg sync.WaitGroup
6061
}
6162

6263
var _ tpt.Listener = &listener{}
@@ -91,30 +92,27 @@ func newListener(transport *WebRTCTransport, laddr ma.Multiaddr, socket net.Pack
9192
}
9293

9394
l.ctx, l.cancel = context.WithCancel(context.Background())
94-
mux := udpmux.NewUDPMux(socket)
95-
l.mux = mux
96-
mux.Start()
95+
l.mux = udpmux.NewUDPMux(socket)
96+
l.mux.Start()
9797

98-
go l.listen()
98+
l.wg.Add(1)
99+
go func() {
100+
l.wg.Done()
101+
l.listen()
102+
}()
99103

100104
return l, err
101105
}
102106

103107
func (l *listener) listen() {
104-
// Accepting a connection requires instantiating a peerconnection
105-
// and a noise connection which is expensive. We therefore limit
106-
// the number of in-flight connection requests. A connection
107-
// is considered to be in flight from the instant it is handled
108-
// until it is dequeued by a call to Accept, or errors out in some
109-
// way.
110-
inFlightQueueCh := make(chan struct{}, l.transport.maxInFlightConnections)
111-
for i := uint32(0); i < l.transport.maxInFlightConnections; i++ {
112-
inFlightQueueCh <- struct{}{}
113-
}
114-
108+
// Accepting a connection requires instantiating a peerconnection and a noise connection
109+
// which is expensive. We therefore limit the number of in-flight connection requests. A
110+
// connection is considered to be in flight from the instant it is handled until it is
111+
// dequeued by a call to Accept, or errors out in some way.
112+
inFlightSemaphore := make(chan struct{}, l.transport.maxInFlightConnections)
115113
for {
116114
select {
117-
case <-inFlightQueueCh:
115+
case inFlightSemaphore <- struct{}{}:
118116
case <-l.ctx.Done():
119117
return
120118
}
@@ -128,7 +126,7 @@ func (l *listener) listen() {
128126
}
129127

130128
go func() {
131-
defer func() { inFlightQueueCh <- struct{}{} }() // free this spot once again
129+
defer func() { <-inFlightSemaphore }()
132130

133131
ctx, cancel := context.WithTimeout(l.ctx, candidateSetupTimeout)
134132
defer cancel()
@@ -145,7 +143,7 @@ func (l *listener) listen() {
145143
log.Warn("could not push connection: ctx done")
146144
conn.Close()
147145
case l.acceptQueue <- conn:
148-
// acceptQueue is an unbuffered channel, so this block until the connection is accepted.
146+
// acceptQueue is an unbuffered channel, so this blocks until the connection is accepted.
149147
}
150148
}()
151149
}
@@ -307,7 +305,18 @@ func (l *listener) Close() error {
307305
select {
308306
case <-l.ctx.Done():
309307
default:
310-
l.cancel()
308+
}
309+
l.cancel()
310+
l.mux.Close()
311+
l.wg.Wait()
312+
loop:
313+
for {
314+
select {
315+
case conn := <-l.acceptQueue:
316+
conn.Close()
317+
default:
318+
break loop
319+
}
311320
}
312321
return nil
313322
}

‎p2p/transport/webrtc/stream_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func TestStreamCloseAfterFINACK(t *testing.T) {
366366
select {
367367
case <-done:
368368
t.Fatalf("Close should not have completed without processing FIN_ACK")
369-
case <-time.After(2 * time.Second):
369+
case <-time.After(200 * time.Millisecond):
370370
}
371371

372372
b := make([]byte, 1)

0 commit comments

Comments
 (0)