Skip to content

Commit

Permalink
fully encapsulate zeromq socket
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcphers committed Jun 20, 2023
1 parent c1a3e97 commit b7ce9d1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 31 deletions.
42 changes: 21 additions & 21 deletions extensions/jupyter-adapter/src/JupyterKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
this.connect(session.state.connectionFile).then(() => {

// Subscribe to all topics and connect the IOPub socket
this._iopub?.socket().subscribe('');
this._iopub?.socket().on('message', (...args: any[]) => {
this._iopub?.onMessage((args: any[]) => {
const msg = deserializeJupyterMessage(args, this._session!.key, this._channel);

// If this is a status message, save the status. Note that
Expand All @@ -358,15 +357,15 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
});

// Connect the Shell socket
this._shell?.socket().on('message', (...args: any[]) => {
this._shell?.onMessage((args: any[]) => {
const msg = deserializeJupyterMessage(args, this._session!.key, this._channel);
if (msg !== null) {
this.emitMessage(JupyterSockets.shell, msg);
}
});

// Connect the Stdin socket
this._stdin?.socket().on('message', (...args: any[]) => {
this._stdin?.onMessage((...args: any[]) => {
const msg = deserializeJupyterMessage(args, this._session!.key, this._channel);
if (msg !== null) {
// If this is an input request, save the header so we can
Expand All @@ -386,7 +385,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
}, 10000);

// Wait for the initial heartbeat
this._heartbeat?.socket().once('message', (msg: string) => {
const reg = this._heartbeat?.onMessage((msg: string) => {

// We got the heartbeat, so cancel the timeout
clearTimeout(timeout);
Expand All @@ -400,11 +399,15 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
const seconds = vscode.workspace.getConfiguration('positron').get('heartbeat', 30) as number;
this.log(`Starting heartbeat check at ${seconds} second intervals...`);
this.heartbeat();
this._heartbeat?.socket().on('message', (msg: string) => {

// Dispose this listener now that we've received the initial heartbeat
reg?.dispose();

this._heartbeat?.onMessage((msg: string) => {
this.onHeartbeat(msg);
});
});
this._heartbeat?.socket().send([HEARTBEAT_MESSAGE]);
this._heartbeat?.send([HEARTBEAT_MESSAGE]);

}).catch((err) => {
reject(err);
Expand Down Expand Up @@ -936,7 +939,8 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
* @param parent The parent message header (if any, {} if no parent)
* @param message The body of the message
*/
private sendToSocket(id: string, type: string, dest: JupyterSocket, parent: JupyterMessageHeader, message: JupyterMessageSpec): Promise<void> {
private async sendToSocket(id: string, type: string, dest: JupyterSocket,
parent: JupyterMessageHeader, message: JupyterMessageSpec) {
const msg: JupyterMessage = {
buffers: [],
content: message,
Expand All @@ -945,17 +949,13 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
parent_header: parent
};
this.log(`SEND ${msg.header.msg_type} to ${dest.title()}: ${JSON.stringify(msg)}`);
return new Promise<void>((resolve, reject) => {
dest.socket().send(serializeJupyterMessage(msg, this._session!.key), 0, (err) => {
if (err) {
this.log(`SEND ${msg.header.msg_type}: ERR: ${err}`);
reject(err);
} else {
this.log(`SEND ${msg.header.msg_type}: OK`);
resolve();
}
});
});

try {
await dest.send(serializeJupyterMessage(msg, this._session!.key));
this.log(`SEND ${msg.header.msg_type}: OK`);
} catch (err) {
this.log(`SEND ${msg.header.msg_type}: ERR: ${err}`);
}
}

/**
Expand All @@ -965,7 +965,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
const seconds = vscode.workspace.getConfiguration('positron.jupyterAdapter').get('heartbeat', 30) as number;
this._lastHeartbeat = new Date().getUTCMilliseconds();
this.log(`SEND heartbeat with timeout of ${seconds} seconds`);
this._heartbeat?.socket().send([HEARTBEAT_MESSAGE]);
this._heartbeat?.send([HEARTBEAT_MESSAGE]);
this._heartbeatTimer = setTimeout(() => {
this.enterOfflineState();
}, seconds * 1000);
Expand Down Expand Up @@ -1003,7 +1003,7 @@ export class JupyterKernel extends EventEmitter implements vscode.Disposable {
// It'd be slightly more elegant to use `setInterval` here, but this
// keeps the logic for handling the timer in `onHeartbeat` much
// simpler.
this._heartbeat?.socket().send([RECONNECT_MESSAGE]);
this._heartbeat?.send([RECONNECT_MESSAGE]);
this._heartbeatTimer = setTimeout(onlinePoller, 1000);
};
this._heartbeatTimer = setTimeout(onlinePoller, 1000);
Expand Down
39 changes: 29 additions & 10 deletions extensions/jupyter-adapter/src/JupyterSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class JupyterSocket implements vscode.Disposable {
private _port: number;
private _id: number;
private _disconnectEmitter = new vscode.EventEmitter<void>();
private _messageEmitter = new vscode.EventEmitter<any[]>();
private _state: JupyterSocketState = JupyterSocketState.Uninitialized;

static _jupyterSocketCount = 0;
Expand All @@ -39,6 +40,7 @@ export class JupyterSocket implements vscode.Disposable {
this._socket = socket;
this._title = title;
this.onDisconnected = this._disconnectEmitter.event;
this.onMessage = this._messageEmitter.event;

this._addr = '';
this._port = 0;
Expand Down Expand Up @@ -71,11 +73,27 @@ export class JupyterSocket implements vscode.Disposable {
this.onDisconnectedEvent = this.onDisconnectedEvent.bind(this);
this._socket.on('disconnect', this.onDisconnectedEvent);

this.onMessageEvent = this.onMessageEvent.bind(this);
this._socket.on('message', this.onMessageEvent);

this.onConnectDelay = this.onConnectDelay.bind(this);
this._socket.on('connect_delay', this.onConnectDelay);
}

public send(message: any): Promise<void> {
return new Promise<void>((resolve, reject) => {
this._socket.send(message, 0, (err?: Error) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

onDisconnected: vscode.Event<void>;
onMessage: vscode.Event<any>;

/**
* Handles the `disconnect` event from the ZeroMQ socket
Expand All @@ -94,6 +112,10 @@ export class JupyterSocket implements vscode.Disposable {
this._disconnectEmitter.fire();
}

private onMessageEvent(...args: any[]) {
this._messageEmitter.fire(args);
}

/**
* Handles the `connect` event from the ZeroMQ socket
*
Expand All @@ -105,11 +127,16 @@ export class JupyterSocket implements vscode.Disposable {
if (!this._connectPromise) {
return;
}

// Log the connection
this._logger(`${this._title} socket connected to ${addr}`);

this._state = JupyterSocketState.Connected;

// Subscribe to all messages if this is a sub socket
if (this._socket.type === 'sub') {
this._socket.subscribe('');
}

// Resolve the promise
this._connectPromise.resolve();
this._connectPromise = undefined;
Expand Down Expand Up @@ -179,15 +206,6 @@ export class JupyterSocket implements vscode.Disposable {
return this._connectPromise.promise;
}

/**
* Gets the underlying ZeroMQ socket
*
* @returns A ZeroMQ socket
*/
public socket(): zmq.Socket {
return this._socket;
}

/**
* Gets the address used by the socket
*
Expand Down Expand Up @@ -248,6 +266,7 @@ export class JupyterSocket implements vscode.Disposable {

// Clean up event handlers
this._socket.off('connect', this.onConnectedEvent);
this._socket.off('message', this.onMessageEvent);
this._socket.off('disconnect', this.onDisconnectedEvent);
this._socket.off('connect_delay', this.onConnectDelay);

Expand Down

0 comments on commit b7ce9d1

Please sign in to comment.