Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcserver: Convert ws client lifecycle to context. #3025

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5178,13 +5178,14 @@ func (s *Server) standardCmdResult(ctx context.Context, cmd *parsedRPCCmd) (inte
// is suitable for use in replies if the command is invalid in some way such as
// an unregistered command or invalid parameters.
func parseCmd(request *dcrjson.Request) *parsedRPCCmd {
method := types.Method(request.Method)
parsedCmd := parsedRPCCmd{
jsonrpc: request.Jsonrpc,
id: request.ID,
method: types.Method(request.Method),
method: method,
}

params, err := dcrjson.ParseParams(types.Method(request.Method), request.Params)
params, err := dcrjson.ParseParams(method, request.Params)
if err != nil {
// Produce a relevant error when the requested method is not registered
// depending on whether or not it is recognized as being a wallet
Expand Down
132 changes: 75 additions & 57 deletions internal/rpcserver/rpcwebsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ func (s *Server) WebsocketHandler(conn *websocket.Conn, remoteAddr string, authe
return
}
s.ntfnMgr.AddClient(client)
client.Start()
client.WaitForShutdown()
client.Run(context.TODO())
Copy link
Member

@dnldd dnldd Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A todo here to pass down the dcrd parent context to synchronize lifecycles would be good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard, just noticed this is being done in #3026.

s.ntfnMgr.RemoveClient(client)
log.Infof("Disconnected websocket client %s", remoteAddr)
}
Expand Down Expand Up @@ -463,15 +462,7 @@ out:
}
switch n := n.(type) {
case *notificationBlockConnected:
block := (*dcrutil.Block)(n)

// Skip iterating through all txs if no tx
// notification requests exist.
if len(blockNotifications) == 0 {
continue
}

m.notifyBlockConnected(blockNotifications, block)
m.notifyBlockConnected(blockNotifications, (*dcrutil.Block)(n))

case *notificationBlockDisconnected:
m.notifyBlockDisconnected(blockNotifications,
Expand Down Expand Up @@ -592,37 +583,55 @@ func (m *wsNotificationManager) NumClients() int {
// RegisterBlockUpdates requests block update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterBlocks)(wsc)
select {
case m.queueNotification <- (*notificationRegisterBlocks)(wsc):
case <-m.quit:
}
}

// UnregisterBlockUpdates removes block update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterBlocks)(wsc):
case <-m.quit:
}
}

// RegisterWorkUpdates requests work update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterWorkUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterWork)(wsc)
select {
case m.queueNotification <- (*notificationRegisterWork)(wsc):
case <-m.quit:
}
}

// UnregisterWorkUpdates removes work update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterWorkUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterWork)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterWork)(wsc):
case <-m.quit:
}
}

// RegisterTSpendUpdates requests tspend update notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterTSpendUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterTSpend)(wsc)
select {
case m.queueNotification <- (*notificationRegisterTSpend)(wsc):
case <-m.quit:
}
}

// UnregisterTSpendUpdates removes tspend update notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterTSpendUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterTSpend)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterTSpend)(wsc):
case <-m.quit:
}
}

// subscribedClients returns the set of all websocket client quit channels that
Expand Down Expand Up @@ -710,6 +719,12 @@ func (m *wsNotificationManager) subscribedClients(tx *dcrutil.Tx, clients map[ch
// notifyBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain.
func (m *wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, block *dcrutil.Block) {
// Skip notification creation if no clients have requested block connected
// notifications.
if len(clients) == 0 {
return
}

// Create the common portion of the notification that is the same for
// every client.
headerBytes, err := block.MsgBlock().Header.Bytes()
Expand Down Expand Up @@ -946,13 +961,19 @@ func (m *wsNotificationManager) notifyReorganization(clients map[chan struct{}]*
// RegisterWinningTickets requests winning tickets update notifications
// to the passed websocket client.
func (m *wsNotificationManager) RegisterWinningTickets(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterWinningTickets)(wsc)
select {
case m.queueNotification <- (*notificationRegisterWinningTickets)(wsc):
case <-m.quit:
}
}

// UnregisterWinningTickets removes winning ticket notifications for
// the passed websocket client.
func (m *wsNotificationManager) UnregisterWinningTickets(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterWinningTickets)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterWinningTickets)(wsc):
case <-m.quit:
}
}

// notifyWinningTickets notifies websocket clients that have registered for
Expand Down Expand Up @@ -985,13 +1006,19 @@ func (*wsNotificationManager) notifyWinningTickets(
// RegisterNewTickets requests spent/missed tickets update notifications
// to the passed websocket client.
func (m *wsNotificationManager) RegisterNewTickets(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterNewTickets)(wsc)
select {
case m.queueNotification <- (*notificationRegisterNewTickets)(wsc):
case <-m.quit:
}
}

// UnregisterNewTickets removes spent/missed ticket notifications for
// the passed websocket client.
func (m *wsNotificationManager) UnregisterNewTickets(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterNewTickets)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterNewTickets)(wsc):
case <-m.quit:
}
}

// notifyNewTickets notifies websocket clients that have registered for
Expand Down Expand Up @@ -1021,13 +1048,19 @@ func (*wsNotificationManager) notifyNewTickets(clients map[chan struct{}]*wsClie
// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
// client when new transactions are added to the memory pool.
func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
select {
case m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc):
case <-m.quit:
}
}

// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
// client when new transaction are added to the memory pool.
func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
select {
case m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc):
case <-m.quit:
}
}

// notifyForNewTx notifies websocket clients that have registered for updates
Expand Down Expand Up @@ -1167,7 +1200,10 @@ func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *dcrutil.Tx,

// AddClient adds the passed websocket client to the notification manager.
func (m *wsNotificationManager) AddClient(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterClient)(wsc)
select {
case m.queueNotification <- (*notificationRegisterClient)(wsc):
case <-m.quit:
}
}

// RemoveClient removes the passed websocket client and all notifications
Expand Down Expand Up @@ -1789,17 +1825,6 @@ out:
}
}

// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case <-c.ntfnChan:
case <-ntfnSentChan:
default:
break cleanup
}
}
c.wg.Done()
log.Tracef("Websocket client notification queue handler done "+
"for %s", c.addr)
Expand All @@ -1812,8 +1837,7 @@ cleanup:
func (c *wsClient) outHandler() {
out:
for {
// Send any messages ready for send until the quit channel is
// closed.
// Send any messages ready for send until the context is done.
select {
case r := <-c.sendChan:
err := c.conn.WriteMessage(websocket.TextMessage, r.msg)
Expand All @@ -1830,19 +1854,6 @@ out:
}
}

// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case r := <-c.sendChan:
if r.doneChan != nil {
r.doneChan <- false
}
default:
break cleanup
}
}
c.wg.Done()
log.Tracef("Websocket client output handler done for %s", c.addr)
}
Expand Down Expand Up @@ -1920,20 +1931,27 @@ func (c *wsClient) Disconnect() {
c.conn.Close()
}

// Start begins processing input and output messages.
func (c *wsClient) Start() {
// Run starts the websocket client and all other goroutines necessary for it to
// function properly and blocks until the provided context is cancelled.
func (c *wsClient) Run(ctx context.Context) {
log.Tracef("Starting websocket client %s", c.addr)

// Start processing input and output.
c.wg.Add(3)
go c.inHandler(context.TODO())
go c.inHandler(ctx)
go c.notificationQueueHandler()
go c.outHandler()
}

// WaitForShutdown blocks until the websocket client goroutines are stopped
// and the connection is closed.
func (c *wsClient) WaitForShutdown() {
// Forcibly disconnect the websocket client when the context is cancelled
// which also closes the quit channel and thus ensures all of the above
// goroutines are shutdown.
c.wg.Add(1)
go func(ctx context.Context) {
<-ctx.Done()
c.Disconnect()
c.wg.Done()
}(ctx)

c.wg.Wait()
}

Expand Down