From b7ce9d1d3829bf8fa969a1588c70a2e3b8fd0bd4 Mon Sep 17 00:00:00 2001 From: Jonathan McPherson Date: Tue, 20 Jun 2023 15:37:06 -0700 Subject: [PATCH] fully encapsulate zeromq socket --- .../jupyter-adapter/src/JupyterKernel.ts | 42 +++++++++---------- .../jupyter-adapter/src/JupyterSocket.ts | 39 ++++++++++++----- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/extensions/jupyter-adapter/src/JupyterKernel.ts b/extensions/jupyter-adapter/src/JupyterKernel.ts index 86d5447d21e..facfd9a950a 100644 --- a/extensions/jupyter-adapter/src/JupyterKernel.ts +++ b/extensions/jupyter-adapter/src/JupyterKernel.ts @@ -334,8 +334,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { this.connect(session.state.connectionFile).then(() => { // Subscribe to all topics and connect the IOPub socket - this._iopub?.socket().subscribe(''); - this._iopub?.socket().on('message', (...args: any[]) => { + this._iopub?.onMessage((args: any[]) => { const msg = deserializeJupyterMessage(args, this._session!.key, this._channel); // If this is a status message, save the status. Note that @@ -358,7 +357,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { }); // Connect the Shell socket - this._shell?.socket().on('message', (...args: any[]) => { + this._shell?.onMessage((args: any[]) => { const msg = deserializeJupyterMessage(args, this._session!.key, this._channel); if (msg !== null) { this.emitMessage(JupyterSockets.shell, msg); @@ -366,7 +365,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { }); // Connect the Stdin socket - this._stdin?.socket().on('message', (...args: any[]) => { + this._stdin?.onMessage((...args: any[]) => { const msg = deserializeJupyterMessage(args, this._session!.key, this._channel); if (msg !== null) { // If this is an input request, save the header so we can @@ -386,7 +385,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { }, 10000); // Wait for the initial heartbeat - this._heartbeat?.socket().once('message', (msg: string) => { + const reg = this._heartbeat?.onMessage((msg: string) => { // We got the heartbeat, so cancel the timeout clearTimeout(timeout); @@ -400,11 +399,15 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { const seconds = vscode.workspace.getConfiguration('positron').get('heartbeat', 30) as number; this.log(`Starting heartbeat check at ${seconds} second intervals...`); this.heartbeat(); - this._heartbeat?.socket().on('message', (msg: string) => { + + // Dispose this listener now that we've received the initial heartbeat + reg?.dispose(); + + this._heartbeat?.onMessage((msg: string) => { this.onHeartbeat(msg); }); }); - this._heartbeat?.socket().send([HEARTBEAT_MESSAGE]); + this._heartbeat?.send([HEARTBEAT_MESSAGE]); }).catch((err) => { reject(err); @@ -936,7 +939,8 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { * @param parent The parent message header (if any, {} if no parent) * @param message The body of the message */ - private sendToSocket(id: string, type: string, dest: JupyterSocket, parent: JupyterMessageHeader, message: JupyterMessageSpec): Promise { + private async sendToSocket(id: string, type: string, dest: JupyterSocket, + parent: JupyterMessageHeader, message: JupyterMessageSpec) { const msg: JupyterMessage = { buffers: [], content: message, @@ -945,17 +949,13 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { parent_header: parent }; this.log(`SEND ${msg.header.msg_type} to ${dest.title()}: ${JSON.stringify(msg)}`); - return new Promise((resolve, reject) => { - dest.socket().send(serializeJupyterMessage(msg, this._session!.key), 0, (err) => { - if (err) { - this.log(`SEND ${msg.header.msg_type}: ERR: ${err}`); - reject(err); - } else { - this.log(`SEND ${msg.header.msg_type}: OK`); - resolve(); - } - }); - }); + + try { + await dest.send(serializeJupyterMessage(msg, this._session!.key)); + this.log(`SEND ${msg.header.msg_type}: OK`); + } catch (err) { + this.log(`SEND ${msg.header.msg_type}: ERR: ${err}`); + } } /** @@ -965,7 +965,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { const seconds = vscode.workspace.getConfiguration('positron.jupyterAdapter').get('heartbeat', 30) as number; this._lastHeartbeat = new Date().getUTCMilliseconds(); this.log(`SEND heartbeat with timeout of ${seconds} seconds`); - this._heartbeat?.socket().send([HEARTBEAT_MESSAGE]); + this._heartbeat?.send([HEARTBEAT_MESSAGE]); this._heartbeatTimer = setTimeout(() => { this.enterOfflineState(); }, seconds * 1000); @@ -1003,7 +1003,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable { // It'd be slightly more elegant to use `setInterval` here, but this // keeps the logic for handling the timer in `onHeartbeat` much // simpler. - this._heartbeat?.socket().send([RECONNECT_MESSAGE]); + this._heartbeat?.send([RECONNECT_MESSAGE]); this._heartbeatTimer = setTimeout(onlinePoller, 1000); }; this._heartbeatTimer = setTimeout(onlinePoller, 1000); diff --git a/extensions/jupyter-adapter/src/JupyterSocket.ts b/extensions/jupyter-adapter/src/JupyterSocket.ts index 2c39fe3b656..ec3a7c67c61 100644 --- a/extensions/jupyter-adapter/src/JupyterSocket.ts +++ b/extensions/jupyter-adapter/src/JupyterSocket.ts @@ -23,6 +23,7 @@ export class JupyterSocket implements vscode.Disposable { private _port: number; private _id: number; private _disconnectEmitter = new vscode.EventEmitter(); + private _messageEmitter = new vscode.EventEmitter(); private _state: JupyterSocketState = JupyterSocketState.Uninitialized; static _jupyterSocketCount = 0; @@ -39,6 +40,7 @@ export class JupyterSocket implements vscode.Disposable { this._socket = socket; this._title = title; this.onDisconnected = this._disconnectEmitter.event; + this.onMessage = this._messageEmitter.event; this._addr = ''; this._port = 0; @@ -71,11 +73,27 @@ export class JupyterSocket implements vscode.Disposable { this.onDisconnectedEvent = this.onDisconnectedEvent.bind(this); this._socket.on('disconnect', this.onDisconnectedEvent); + this.onMessageEvent = this.onMessageEvent.bind(this); + this._socket.on('message', this.onMessageEvent); + this.onConnectDelay = this.onConnectDelay.bind(this); this._socket.on('connect_delay', this.onConnectDelay); } + public send(message: any): Promise { + return new Promise((resolve, reject) => { + this._socket.send(message, 0, (err?: Error) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + onDisconnected: vscode.Event; + onMessage: vscode.Event; /** * Handles the `disconnect` event from the ZeroMQ socket @@ -94,6 +112,10 @@ export class JupyterSocket implements vscode.Disposable { this._disconnectEmitter.fire(); } + private onMessageEvent(...args: any[]) { + this._messageEmitter.fire(args); + } + /** * Handles the `connect` event from the ZeroMQ socket * @@ -105,11 +127,16 @@ export class JupyterSocket implements vscode.Disposable { if (!this._connectPromise) { return; } + // Log the connection this._logger(`${this._title} socket connected to ${addr}`); - this._state = JupyterSocketState.Connected; + // Subscribe to all messages if this is a sub socket + if (this._socket.type === 'sub') { + this._socket.subscribe(''); + } + // Resolve the promise this._connectPromise.resolve(); this._connectPromise = undefined; @@ -179,15 +206,6 @@ export class JupyterSocket implements vscode.Disposable { return this._connectPromise.promise; } - /** - * Gets the underlying ZeroMQ socket - * - * @returns A ZeroMQ socket - */ - public socket(): zmq.Socket { - return this._socket; - } - /** * Gets the address used by the socket * @@ -248,6 +266,7 @@ export class JupyterSocket implements vscode.Disposable { // Clean up event handlers this._socket.off('connect', this.onConnectedEvent); + this._socket.off('message', this.onMessageEvent); this._socket.off('disconnect', this.onDisconnectedEvent); this._socket.off('connect_delay', this.onConnectDelay);