diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 9ebd529c4a3ed..1dc000fc1978f 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -45,6 +45,31 @@ export interface RpcProtocolOptions { mode?: 'default' | 'clientOnly' | 'serverOnly' } +/** + * Wrapper for a {@link Disposable} that is not available immediately. + */ +export class MaybeDisposable { + + private disposed = false; + private disposable: Disposable | undefined = undefined; + + setDisposable(disposable: Disposable): void { + if (this.disposed) { + disposable.dispose(); + } else { + this.disposable = disposable; + } + } + + dispose(): void { + this.disposed = true; + if (this.disposable) { + this.disposable.dispose(); + this.disposable = undefined; + } + } +} + /** * Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send * requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server). @@ -57,6 +82,7 @@ export class RpcProtocol { static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; protected readonly pendingRequests: Map> = new Map(); + protected readonly pendingRequestCancellationEventListeners: Map = new Map(); protected nextMessageId: number = 0; @@ -80,6 +106,8 @@ export class RpcProtocol { channel.onClose(event => { this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason))); this.pendingRequests.clear(); + this.pendingRequestCancellationEventListeners.forEach(disposable => disposable.dispose()); + this.pendingRequestCancellationEventListeners.clear(); this.toDispose.dispose(); }); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); @@ -131,6 +159,7 @@ export class RpcProtocol { } else { throw new Error(`No reply handler for reply with id: ${id}`); } + this.disposeCancellationEventListener(id); } protected handleReplyErr(id: number, error: any): void { @@ -141,6 +170,15 @@ export class RpcProtocol { } else { throw new Error(`No reply handler for error reply with id: ${id}`); } + this.disposeCancellationEventListener(id); + } + + protected disposeCancellationEventListener(id: number): void { + const toDispose = this.pendingRequestCancellationEventListeners.get(id); + if (toDispose) { + this.pendingRequests.delete(id); + toDispose.dispose(); + } } sendRequest(method: string, args: any[]): Promise { @@ -157,6 +195,10 @@ export class RpcProtocol { this.pendingRequests.set(id, reply); + // register disposable before output.commit() + const maybeDisposable = new MaybeDisposable(); + this.pendingRequestCancellationEventListeners.set(id, maybeDisposable); + const output = this.channel.getWriteBuffer(); this.encoder.request(output, id, method, args); output.commit(); @@ -164,7 +206,10 @@ export class RpcProtocol { if (cancellationToken?.isCancellationRequested) { this.sendCancel(id); } else { - cancellationToken?.onCancellationRequested(() => this.sendCancel(id)); + const disposable = cancellationToken?.onCancellationRequested(() => this.sendCancel(id)); + if (disposable) { + maybeDisposable.setDisposable(disposable); + } } return reply.promise; @@ -233,3 +278,4 @@ export class RpcProtocol { this.onNotificationEmitter.fire({ method, args }); } } +