From 7690ba1bfd480f4652af2db09e49703441c46522 Mon Sep 17 00:00:00 2001 From: Peter Luitjens <43619525+busma13@users.noreply.github.com> Date: Fri, 22 Nov 2024 15:32:16 -0700 Subject: [PATCH] [teraslice-messaging, teraslice] Add abortController for use with sendSliceComplete() (#3838) This PR makes the following changes: - The execution-controller `Client` now creates an `abortController`. Every call to `Client.sendSliceComplete()` now includes a reference to `abortController.signal` allowing for the `pEvent` in the messenger `Core.onceWithTimeout()` to be aborted. - A new listener is added for the `server:shutdown` event that calls `abortController.abort()`. This prevents a worker from still waiting for a response to `sendSliceComplete()` after the server is shutdown. - bump teraslice-messaging from 1.7.0 to 1.7.1 ref: #2106 --- packages/teraslice-messaging/package.json | 2 +- .../src/execution-controller/client.ts | 10 ++++++++ .../src/messenger/client.ts | 2 +- .../teraslice-messaging/src/messenger/core.ts | 25 ++++++++++++++++--- .../src/messenger/interfaces.ts | 1 + packages/teraslice/package.json | 2 +- .../teraslice/src/lib/workers/worker/index.ts | 3 +++ 7 files changed, 38 insertions(+), 7 deletions(-) diff --git a/packages/teraslice-messaging/package.json b/packages/teraslice-messaging/package.json index d72415d5d12..328dca0dfaf 100644 --- a/packages/teraslice-messaging/package.json +++ b/packages/teraslice-messaging/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/teraslice-messaging", "displayName": "Teraslice Messaging", - "version": "1.7.0", + "version": "1.7.1", "description": "An internal teraslice messaging library using socket.io", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme", "bugs": { diff --git a/packages/teraslice-messaging/src/execution-controller/client.ts b/packages/teraslice-messaging/src/execution-controller/client.ts index 1e3db2c97a5..a3080a47bdb 100644 --- a/packages/teraslice-messaging/src/execution-controller/client.ts +++ b/packages/teraslice-messaging/src/execution-controller/client.ts @@ -7,6 +7,7 @@ const ONE_MIN = 60 * 1000; export class Client extends core.Client { public workerId: string; + private abortController: AbortController; constructor(opts: i.ClientOptions) { const { @@ -47,6 +48,7 @@ export class Client extends core.Client { this.workerId = workerId; this.available = false; + this.abortController = new AbortController(); } async start() { @@ -75,6 +77,13 @@ export class Client extends core.Client { payload: msg.payload, }); }); + + this.on('server:shutdown', () => { + // this will send an abort signal to the Core.onceWithTimeout pEvent + // allowing the worker to shutdown without receiving a response to + // a sendSliceComplete() message + this.abortController.abort(); + }); } onExecutionFinished(fn: () => void) { @@ -85,6 +94,7 @@ export class Client extends core.Client { return this.send('worker:slice:complete', withoutNil(payload), { response: true, volatile: false, + signal: this.abortController.signal }); } diff --git a/packages/teraslice-messaging/src/messenger/client.ts b/packages/teraslice-messaging/src/messenger/client.ts index 986bd8c1c46..38ad490a9fa 100644 --- a/packages/teraslice-messaging/src/messenger/client.ts +++ b/packages/teraslice-messaging/src/messenger/client.ts @@ -278,7 +278,7 @@ export class Client extends Core { respondBy, }; - const responseMsg = this.handleSendResponse(message); + const responseMsg = this.handleSendResponse(message, options.signal); this.socket.emit(eventName, message); return responseMsg; } diff --git a/packages/teraslice-messaging/src/messenger/core.ts b/packages/teraslice-messaging/src/messenger/core.ts index 609237fdd5e..8e952efbd52 100644 --- a/packages/teraslice-messaging/src/messenger/core.ts +++ b/packages/teraslice-messaging/src/messenger/core.ts @@ -2,7 +2,8 @@ import ms from 'ms'; import { pEvent } from 'p-event'; import { EventEmitter } from 'node:events'; import { - toString, isInteger, debugLogger, Logger + toString, isInteger, debugLogger, + Logger, TSError } from '@terascope/utils'; import * as i from './interfaces.js'; @@ -40,11 +41,22 @@ export class Core extends EventEmitter { this.removeAllListeners(); } - protected async handleSendResponse(sent: i.Message): Promise { + protected async handleSendResponse( + sent: i.Message, + signal?: AbortSignal + ): Promise { if (!sent.response) return null; const remaining = sent.respondBy - Date.now(); - const response = await this.onceWithTimeout(sent.id, remaining); + const response = await this.onceWithTimeout(sent.id, remaining, signal); + + // server shutdown + if (signal?.aborted) { + const msg = sent.eventName === 'worker:slice:complete' + ? `Execution controller shutdown before receiving worker slice analytics. Event: "${sent.eventName}"` + : `Execution controller shutdown before receiving "${sent.eventName}" event`; + throw new TSError(msg, { retryable: false }); + } // it is a timeout if (response == null) { @@ -136,12 +148,17 @@ export class Core extends EventEmitter { } } - async onceWithTimeout(eventName: string, timeout?: number): Promise { + async onceWithTimeout( + eventName: string, + timeout?: number, + abortSignal?: AbortSignal + ): Promise { const timeoutMs: number = this.getTimeout(timeout); try { const { payload } = (await pEvent(this, eventName, { rejectionEvents: [], timeout: timeoutMs, + signal: abortSignal })) as i.EventMessage; return payload; } catch (err) { diff --git a/packages/teraslice-messaging/src/messenger/interfaces.ts b/packages/teraslice-messaging/src/messenger/interfaces.ts index af6c2ab393d..a08878fb6a9 100644 --- a/packages/teraslice-messaging/src/messenger/interfaces.ts +++ b/packages/teraslice-messaging/src/messenger/interfaces.ts @@ -57,6 +57,7 @@ export interface SendOptions { volatile?: boolean; response?: boolean; timeout?: number; + signal?: AbortSignal; } export interface ConnectedClient { diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 24f9b7ac3ee..059b249829b 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -41,7 +41,7 @@ "@kubernetes/client-node": "~0.22.0", "@terascope/elasticsearch-api": "~4.4.0", "@terascope/job-components": "~1.6.0", - "@terascope/teraslice-messaging": "~1.7.0", + "@terascope/teraslice-messaging": "~1.7.1", "@terascope/types": "~1.3.0", "@terascope/utils": "~1.4.0", "async-mutex": "~0.5.0", diff --git a/packages/teraslice/src/lib/workers/worker/index.ts b/packages/teraslice/src/lib/workers/worker/index.ts index 18d4ddb4bd2..7a0807144c1 100644 --- a/packages/teraslice/src/lib/workers/worker/index.ts +++ b/packages/teraslice/src/lib/workers/worker/index.ts @@ -349,6 +349,9 @@ export class Worker { } catch (err) { if (this.isShuttingDown) { throw err; + } else if (err.retryable === false) { + this.logger.warn(`${err}, will not retry.`); + return true; } else { this.logger.warn(err); }