Skip to content

Commit

Permalink
internal: fix a panic due to a racy Close routine
Browse files Browse the repository at this point in the history
  • Loading branch information
jrobsonchase committed Jan 4, 2024
1 parent 1505d8c commit b8838a3
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions internal/tunnel/client/raw_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ type SessionHandler interface {
// When RawSession.Accept() returns an error, that means the session is dead.
// Client sessions run over a muxado session.
type rawSession struct {
mux *muxado.Heartbeat // the muxado session we're multiplexing streams over
id string // session id for logging purposes
handler SessionHandler // callbacks to allow the application to handle requests from the server
latency chan time.Duration
closeLatencyOnce sync.Once
mux *muxado.Heartbeat // the muxado session we're multiplexing streams over
id string // session id for logging purposes
handler SessionHandler // callbacks to allow the application to handle requests from the server
latency chan time.Duration
closed bool
closedLock sync.RWMutex
log.Logger
}

Expand Down Expand Up @@ -230,10 +231,19 @@ func (s *rawSession) respFunc(raw net.Conn) func(v any) error {
}

func (s *rawSession) Close() error {
s.closeLatencyOnce.Do(func() {
// Close the muxado heartbeat session. After this, the goroutine calling the
// callback handler should exit.
err := s.mux.Close()

// Prevent sending on a closed channel in the callback handler by ensuring
// exclusive access to the channel and the closed boolean here.
s.closedLock.Lock()
defer s.closedLock.Unlock()
if !s.closed {
s.closed = true
close(s.latency)
})
return s.mux.Close()
}
return err
}

// This is essentially the RPC protocol. The request and response are just JSON
Expand Down Expand Up @@ -268,6 +278,15 @@ func (s *rawSession) rpc(reqtype proto.ReqType, req any, resp any) error {
}

func (s *rawSession) onHeartbeat(pingTime time.Duration, timeout bool) {
// make sure we don't send on a closed channel.
// Any number of `onHeartbeat` callbacks can be in flight at a given time,
// but only one Close.
s.closedLock.RLock()
defer s.closedLock.RUnlock()
if s.closed {
return
}

if timeout {
s.Error("heartbeat timeout, terminating session")
s.Close()
Expand Down

0 comments on commit b8838a3

Please sign in to comment.