Skip to content

Commit

Permalink
fix: handle send errs correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
adiwajshing committed Jan 17, 2025
1 parent 366b1f2 commit e6626a7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/server/create-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { logger as LOGGER } from 'src/utils'
import { createBgpListener } from 'src/utils/bgp-listener'
import { getEnvVariable } from 'src/utils/env'
import { SelectedServiceSignatureType } from 'src/utils/signatures'
import { promisifySend } from 'src/utils/ws'
import type { Duplex } from 'stream'
import { WebSocket, WebSocketServer } from 'ws'

Expand Down Expand Up @@ -90,6 +91,7 @@ async function handleNewClient(
req: IncomingMessage,
bgpListener: BGPListener | undefined
) {
promisifySend(ws)
const client = await AttestorServerSocket.acceptConnection(
ws,
{ req, bgpListener, logger: LOGGER }
Expand Down
4 changes: 4 additions & 0 deletions src/types/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ interface WebSocketWithServerSocket {
* Our RPC socket instance
*/
serverSocket?: IAttestorServerSocket
/**
* Just promisified send
*/
sendPromise?: (data: Uint8Array) => Promise<void>
}

declare module 'ws' {
Expand Down
8 changes: 5 additions & 3 deletions src/utils/socket-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ export class AttestorSocket implements IAttestorSocket {
const msg = packRpcMessages(...msgs)
const bytes = RPCMessages.encode(msg).finish()

await this.socket.send(bytes, err => {
this.logger.debug({ err }, 'error sending message')
})
if('sendPromise' in this.socket && this.socket.sendPromise) {
await this.socket.sendPromise(bytes)
} else {
this.socket.send(bytes)
}

return msg
}
Expand Down
30 changes: 29 additions & 1 deletion src/utils/ws.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { MAX_PAYLOAD_SIZE } from 'src/config'
import { detectEnvironment } from 'src/utils/env'
import type { WebSocket as WSWebSocket } from 'ws'

/**
* Default WebSocket implementation, uses `ws` package
Expand All @@ -9,8 +10,35 @@ import { detectEnvironment } from 'src/utils/env'
export function makeWebSocket(url: string) {
if(detectEnvironment() === 'node') {
const ws = require('ws') as typeof import('ws')
return new ws.WebSocket(url, { maxPayload: MAX_PAYLOAD_SIZE })
return promisifySend(
new ws.WebSocket(url, { maxPayload: MAX_PAYLOAD_SIZE })
)
}

return new WebSocket(url)
}

/**
* Adds the "sendPromise" fn to the given WebSocket instance,
* if not already present.
*/
export function promisifySend(ws: WSWebSocket) {
if(ws.sendPromise) {
return ws
}

ws.sendPromise = (data) => (
new Promise((resolve, reject) => {
ws.send(data, err => {
if(err) {
reject(err)
return
}

resolve()
})
})
)

return ws
}

0 comments on commit e6626a7

Please sign in to comment.