Skip to content

Commit

Permalink
rpcserver: Convert ws client lifecycle to context.
Browse files Browse the repository at this point in the history
This modifies the lifecycle of websocket clients to use the expected
pattern for running subsystems based on contexts.

In particular, this replaces the Start and WaitForShutdown methods with
a single method named Run and arranges for it to block until the
provided context is cancelled.  This is more flexible for the caller
since it can easily turn blocking code into async code while the reverse
is not true.

The new Run method waits for all goroutines that it starts to shutdown
before returning to help ensure an orderly shutdown.

Since there are exported methods that send messages to the output and
notification handler goroutines via channels and those goroutines are
stopped during the shutdown process, all sends select across both the
channels in question as well as a quit channel which is closed when the
context it cancelled.  This ensures callers can't end up blocking on
send to a goroutine that is no longer running without needing additional
mutexes.
  • Loading branch information
davecgh committed Nov 27, 2022
1 parent 82ac074 commit 32e2937
Showing 1 changed file with 68 additions and 48 deletions.
116 changes: 68 additions & 48 deletions internal/rpcserver/rpcwebsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ func (s *Server) WebsocketHandler(conn *websocket.Conn, remoteAddr string, authe
return
}
s.ntfnMgr.AddClient(client)
client.Start()
client.WaitForShutdown()
client.Run(context.TODO())
s.ntfnMgr.RemoveClient(client)
log.Infof("Disconnected websocket client %s", remoteAddr)
}
Expand Down Expand Up @@ -584,37 +583,55 @@ func (m *wsNotificationManager) NumClients() int {
// RegisterBlockUpdates requests block update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterBlocks)(wsc)
select {
case m.queueNotification <- (*notificationRegisterBlocks)(wsc):
case <-m.quit:
}
}

// UnregisterBlockUpdates removes block update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterBlocks)(wsc):
case <-m.quit:
}
}

// RegisterWorkUpdates requests work update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterWorkUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterWork)(wsc)
select {
case m.queueNotification <- (*notificationRegisterWork)(wsc):
case <-m.quit:
}
}

// UnregisterWorkUpdates removes work update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterWorkUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterWork)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterWork)(wsc):
case <-m.quit:
}
}

// RegisterTSpendUpdates requests tspend update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterTSpendUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterTSpend)(wsc)
select {
case m.queueNotification <- (*notificationRegisterTSpend)(wsc):
case <-m.quit:
}
}

// UnregisterTSpendUpdates removes tspend update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterTSpendUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterTSpend)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterTSpend)(wsc):
case <-m.quit:
}
}

// subscribedClients returns the set of all websocket client quit channels that
Expand Down Expand Up @@ -944,13 +961,19 @@ func (m *wsNotificationManager) notifyReorganization(clients map[chan struct{}]*
// RegisterWinningTickets requests winning tickets update notifications
// to the passed websocket client.
func (m *wsNotificationManager) RegisterWinningTickets(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterWinningTickets)(wsc)
select {
case m.queueNotification <- (*notificationRegisterWinningTickets)(wsc):
case <-m.quit:
}
}

// UnregisterWinningTickets removes winning ticket notifications for
// the passed websocket client.
func (m *wsNotificationManager) UnregisterWinningTickets(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterWinningTickets)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterWinningTickets)(wsc):
case <-m.quit:
}
}

// notifyWinningTickets notifies websocket clients that have registered for
Expand Down Expand Up @@ -983,13 +1006,19 @@ func (*wsNotificationManager) notifyWinningTickets(
// RegisterNewTickets requests spent/missed tickets update notifications
// to the passed websocket client.
func (m *wsNotificationManager) RegisterNewTickets(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterNewTickets)(wsc)
select {
case m.queueNotification <- (*notificationRegisterNewTickets)(wsc):
case <-m.quit:
}
}

// UnregisterNewTickets removes spent/missed ticket notifications for
// the passed websocket client.
func (m *wsNotificationManager) UnregisterNewTickets(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterNewTickets)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterNewTickets)(wsc):
case <-m.quit:
}
}

// notifyNewTickets notifies websocket clients that have registered for
Expand Down Expand Up @@ -1019,13 +1048,19 @@ func (*wsNotificationManager) notifyNewTickets(clients map[chan struct{}]*wsClie
// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
// client when new transactions are added to the memory pool.
func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
select {
case m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc):
case <-m.quit:
}
}

// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
// client when new transaction are added to the memory pool.
func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc):
case <-m.quit:
}
}

// notifyForNewTx notifies websocket clients that have registered for updates
Expand Down Expand Up @@ -1165,7 +1200,10 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx,

// AddClient adds the passed websocket client to the notification manager.
func (m *wsNotificationManager) AddClient(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterClient)(wsc)
select {
case m.queueNotification <- (*notificationRegisterClient)(wsc):
case <-m.quit:
}
}

// RemoveClient removes the passed websocket client and all notifications
Expand Down Expand Up @@ -1787,17 +1825,6 @@ out:
}
}

// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case <-c.ntfnChan:
case <-ntfnSentChan:
default:
break cleanup
}
}
c.wg.Done()
log.Tracef("Websocket client notification queue handler done "+
"for %s", c.addr)
Expand All @@ -1810,8 +1837,7 @@ cleanup:
func (c *wsClient) outHandler() {
out:
for {
// Send any messages ready for send until the quit channel is
// closed.
// Send any messages ready for send until the context is done.
select {
case r := <-c.sendChan:
err := c.conn.WriteMessage(websocket.TextMessage, r.msg)
Expand All @@ -1828,19 +1854,6 @@ out:
}
}

// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case r := <-c.sendChan:
if r.doneChan != nil {
r.doneChan <- false
}
default:
break cleanup
}
}
c.wg.Done()
log.Tracef("Websocket client output handler done for %s", c.addr)
}
Expand Down Expand Up @@ -1918,20 +1931,27 @@ func (c *wsClient) Disconnect() {
c.conn.Close()
}

// Start begins processing input and output messages.
func (c *wsClient) Start() {
// Run starts the websocket client and all other goroutines necessary for it to
// function properly and blocks until the provided context is cancelled.
func (c *wsClient) Run(ctx context.Context) {
log.Tracef("Starting websocket client %s", c.addr)

// Start processing input and output.
c.wg.Add(3)
go c.inHandler(context.TODO())
go c.inHandler(ctx)
go c.notificationQueueHandler()
go c.outHandler()
}

// WaitForShutdown blocks until the websocket client goroutines are stopped
// and the connection is closed.
func (c *wsClient) WaitForShutdown() {
// Forcibly disconnect the websocket client when the context is cancelled
// which also closes the quit channel and thus ensures all of the above
// goroutines are shutdown.
c.wg.Add(1)
go func(ctx context.Context) {
<-ctx.Done()
c.Disconnect()
c.wg.Done()
}(ctx)

c.wg.Wait()
}

Expand Down

0 comments on commit 32e2937

Please sign in to comment.