From cfab60caa8f85f373e84a3aaa788c633be066072 Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 20 Dec 2024 14:05:00 +0100 Subject: [PATCH 1/3] fix: possible EventEmitter memory leak detected --- .changeset/selfish-worms-decide.md | 8 +++ packages/executor/src/execution/execute.ts | 59 ++++++++++++++++------ 2 files changed, 52 insertions(+), 15 deletions(-) create mode 100644 .changeset/selfish-worms-decide.md diff --git a/.changeset/selfish-worms-decide.md b/.changeset/selfish-worms-decide.md new file mode 100644 index 00000000000..920e3ebd73f --- /dev/null +++ b/.changeset/selfish-worms-decide.md @@ -0,0 +1,8 @@ +--- +'@graphql-tools/executor': patch +--- + +Surpress the "possible EventEmitter memory leak detected." warning occuring on Node.js when passing +a `AbortSignal` to `execute`. + +Each execution will now only set up a single listener on the supplied `AbortSignal`. While the warning is harmless it can be misleading, which is the main motivation of this change. diff --git a/packages/executor/src/execution/execute.ts b/packages/executor/src/execution/execute.ts index 6915f0b22ab..da750a507bf 100644 --- a/packages/executor/src/execution/execute.ts +++ b/packages/executor/src/execution/execute.ts @@ -287,6 +287,35 @@ export function execute( return executeImpl(exeContext); } +// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected" +// on Node.js +const abortSignalHandlers = new WeakMap>(); + +/** + * Register an AbortSignal handler for a signal. + * This helper function mainly exists to work around the + * "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit." + * warning occuring on Node.js + */ +function registerAbortSignalHandler(signal: AbortSignal, handler: VoidFunction): void { + let handlers = abortSignalHandlers.get(signal); + + if (!Array.isArray(handlers)) { + handlers = new Set(); + abortSignalHandlers.set(signal, handlers); + + function abortSignalHandler() { + for (const handler of handlers!) { + handler(); + } + } + + signal.addEventListener('abort', abortSignalHandler, { once: true }); + } + + handlers.add(handler); +} + function executeImpl( exeContext: ExecutionContext, ): MaybePromise | IncrementalExecutionResults> { @@ -958,13 +987,12 @@ async function completeAsyncIteratorValue( iterator: AsyncIterator, asyncPayloadRecord?: AsyncPayloadRecord, ): Promise> { - exeContext.signal?.addEventListener( - 'abort', - () => { + if (exeContext.signal) { + registerAbortSignalHandler(exeContext.signal, () => { iterator.return?.(); - }, - { once: true }, - ); + }); + } + const errors = asyncPayloadRecord?.errors ?? exeContext.errors; const stream = getStreamValues(exeContext, fieldNodes, path); let containsPromise = false; @@ -1761,9 +1789,12 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable return { [Symbol.asyncIterator]() { const asyncIterator = result[Symbol.asyncIterator](); - signal?.addEventListener('abort', () => { - asyncIterator.return?.(); - }); + if (signal) { + registerAbortSignalHandler(signal, () => { + asyncIterator.return?.(); + }); + } + return asyncIterator; }, }; @@ -2085,14 +2116,12 @@ function yieldSubsequentPayloads( let isDone = false; const abortPromise = new Promise((_, reject) => { - exeContext.signal?.addEventListener( - 'abort', - () => { + if (exeContext.signal) { + registerAbortSignalHandler(exeContext.signal, () => { isDone = true; reject(exeContext.signal?.reason); - }, - { once: true }, - ); + }); + } }); async function next(): Promise> { From 2da2630e26ab0bac97d182d0843cc254baae3f30 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 24 Dec 2024 02:56:32 -0500 Subject: [PATCH 2/3] Single place for AbortSignal listeners -> #6789 (#6793) * Single place for AbortSignal listeners * Fix * .. * Lets go * Small improvement * Changeset for utils --- .changeset/hip-bikes-hunt.md | 8 ++ .../execution/__tests__/abort-signal.test.ts | 23 ++--- packages/executor/src/execution/execute.ts | 99 +++++++------------ .../src/execution/promiseForObject.ts | 23 ++--- packages/utils/src/index.ts | 1 + .../utils/src/registerAbortSignalListener.ts | 45 +++++++++ website/theme.config.tsx | 1 + 7 files changed, 112 insertions(+), 88 deletions(-) create mode 100644 .changeset/hip-bikes-hunt.md create mode 100644 packages/utils/src/registerAbortSignalListener.ts diff --git a/.changeset/hip-bikes-hunt.md b/.changeset/hip-bikes-hunt.md new file mode 100644 index 00000000000..b655f74e4a0 --- /dev/null +++ b/.changeset/hip-bikes-hunt.md @@ -0,0 +1,8 @@ +--- +'@graphql-tools/utils': minor +--- + +- New helper function `getAbortPromise` to get a promise rejected when `AbortSignal` is aborted +- New helper function `registerAbortSignalListener` to register a listener to abort a promise when `AbortSignal` is aborted + +Instead of using `.addEventListener('abort', () => {/* ... */})`, we register a single listener to avoid warnings on Node.js like `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 abort listeners added. Use emitter.setMaxListeners() to increase limit`. diff --git a/packages/executor/src/execution/__tests__/abort-signal.test.ts b/packages/executor/src/execution/__tests__/abort-signal.test.ts index f3ea8ef66a2..ae88226c67c 100644 --- a/packages/executor/src/execution/__tests__/abort-signal.test.ts +++ b/packages/executor/src/execution/__tests__/abort-signal.test.ts @@ -7,9 +7,18 @@ import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils'; import { normalizedExecutor } from '../normalizedExecutor'; describe('Abort Signal', () => { + // Always make sure that listener is registered once or never + let controller: AbortController; + let spy: jest.SpyInstance; + beforeEach(() => { + controller = new AbortController(); + spy = jest.spyOn(controller.signal, 'addEventListener'); + }); + afterEach(() => { + expect(spy.mock.calls.length).toBeLessThanOrEqual(1); + }); it('should stop the subscription', async () => { - expect.assertions(2); - const controller = new AbortController(); + expect.assertions(3); let stopped = false; const schema = makeExecutableSchema({ typeDefs: /* GraphQL */ ` @@ -64,7 +73,6 @@ describe('Abort Signal', () => { expect(results).toEqual([0, 1, 2, 3, 4]); }); it('pending subscription execution is canceled', async () => { - const controller = new AbortController(); const rootResolverGotInvokedD = createDeferred(); const requestGotCancelledD = createDeferred(); let aResolverGotInvoked = false; @@ -121,10 +129,9 @@ describe('Abort Signal', () => { controller.abort(); await expect($next).rejects.toMatchInlineSnapshot(`DOMException {}`); expect(aResolverGotInvoked).toEqual(false); + expect(controller.signal.addEventListener).toHaveBeenCalledTimes(1); }); it('should stop the serial mutation execution', async () => { - const controller = new AbortController(); - let didInvokeFirstFn = false; let didInvokeSecondFn = false; let didInvokeThirdFn = false; @@ -174,7 +181,6 @@ describe('Abort Signal', () => { expect(didInvokeThirdFn).toBe(false); }); it('should stop stream execution', async () => { - const controller = new AbortController(); let isAborted = false; const schema = makeExecutableSchema({ @@ -223,7 +229,6 @@ describe('Abort Signal', () => { expect(isAborted).toEqual(true); }); it('stops pending stream execution for incremental delivery (@stream)', async () => { - const controller = new AbortController(); const d = createDeferred(); let isReturnInvoked = false; @@ -285,7 +290,6 @@ describe('Abort Signal', () => { expect(isReturnInvoked).toEqual(true); }); it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => { - const controller = new AbortController(); const d1 = createDeferred(); const d2 = createDeferred(); @@ -404,7 +408,6 @@ describe('Abort Signal', () => { }, }, }); - const controller = new AbortController(); const result = await normalizedExecutor({ schema, document: parse(/* GraphQL */ ` @@ -443,7 +446,6 @@ describe('Abort Signal', () => { expect(bResolverGotInvoked).toBe(false); }); it('stops promise execution', async () => { - const controller = new AbortController(); const d = createDeferred(); const schema = makeExecutableSchema({ @@ -474,7 +476,6 @@ describe('Abort Signal', () => { await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`); }); it('does not even try to execute if the signal is already aborted', async () => { - const controller = new AbortController(); let resolverGotInvoked = false; const schema = makeExecutableSchema({ typeDefs: /* GraphQL */ ` diff --git a/packages/executor/src/execution/execute.ts b/packages/executor/src/execution/execute.ts index da750a507bf..3053d3b7079 100644 --- a/packages/executor/src/execution/execute.ts +++ b/packages/executor/src/execution/execute.ts @@ -35,6 +35,7 @@ import { collectFields, createGraphQLError, fakePromise, + getAbortPromise, getArgumentValues, getDefinedRootType, GraphQLResolveInfo, @@ -52,6 +53,7 @@ import { Path, pathToArray, promiseReduce, + registerAbortSignalListener, } from '@graphql-tools/utils'; import { TypedDocumentNode } from '@graphql-typed-document-node/core'; import { DisposableSymbols } from '@whatwg-node/disposablestack'; @@ -287,41 +289,10 @@ export function execute( return executeImpl(exeContext); } -// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected" -// on Node.js -const abortSignalHandlers = new WeakMap>(); - -/** - * Register an AbortSignal handler for a signal. - * This helper function mainly exists to work around the - * "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit." - * warning occuring on Node.js - */ -function registerAbortSignalHandler(signal: AbortSignal, handler: VoidFunction): void { - let handlers = abortSignalHandlers.get(signal); - - if (!Array.isArray(handlers)) { - handlers = new Set(); - abortSignalHandlers.set(signal, handlers); - - function abortSignalHandler() { - for (const handler of handlers!) { - handler(); - } - } - - signal.addEventListener('abort', abortSignalHandler, { once: true }); - } - - handlers.add(handler); -} - function executeImpl( exeContext: ExecutionContext, ): MaybePromise | IncrementalExecutionResults> { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); // Return a Promise that will eventually resolve to the data described by // The "Response" section of the GraphQL specification. @@ -351,9 +322,7 @@ function executeImpl( return initialResult; }, (error: any) => { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); if (error.errors) { exeContext.errors.push(...error.errors); @@ -587,9 +556,7 @@ function executeFieldsSerially( fields, (results, [responseName, fieldNodes]) => { const fieldPath = addPath(path, responseName, parentType.name); - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); return new ValueOrPromise(() => executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath), @@ -624,9 +591,7 @@ function executeFields( try { for (const [responseName, fieldNodes] of fields) { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); const fieldPath = addPath(path, responseName, parentType.name); const result = executeField( @@ -987,8 +952,8 @@ async function completeAsyncIteratorValue( iterator: AsyncIterator, asyncPayloadRecord?: AsyncPayloadRecord, ): Promise> { - if (exeContext.signal) { - registerAbortSignalHandler(exeContext.signal, () => { + if (exeContext.signal && iterator.return) { + registerAbortSignalListener(exeContext.signal, () => { iterator.return?.(); }); } @@ -1786,18 +1751,22 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable 'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`, ); } - return { - [Symbol.asyncIterator]() { - const asyncIterator = result[Symbol.asyncIterator](); - if (signal) { - registerAbortSignalHandler(signal, () => { - asyncIterator.return?.(); - }); - } + if (signal) { + return { + [Symbol.asyncIterator]() { + const asyncIterator = result[Symbol.asyncIterator](); - return asyncIterator; - }, - }; + if (asyncIterator.return) { + registerAbortSignalListener(signal, () => { + asyncIterator.return?.(); + }); + } + + return asyncIterator; + }, + }; + } + return result; } function executeDeferredFragment( @@ -2115,24 +2084,22 @@ function yieldSubsequentPayloads( ): AsyncGenerator { let isDone = false; - const abortPromise = new Promise((_, reject) => { - if (exeContext.signal) { - registerAbortSignalHandler(exeContext.signal, () => { - isDone = true; - reject(exeContext.signal?.reason); - }); - } - }); + const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined; async function next(): Promise> { if (isDone) { return { value: undefined, done: true }; } - await Promise.race([ - abortPromise, - ...Array.from(exeContext.subsequentPayloads).map(p => p.promise), - ]); + const subSequentPayloadPromises = Array.from(exeContext.subsequentPayloads).map( + record => record.promise, + ); + + if (abortPromise) { + await Promise.race([abortPromise, ...subSequentPayloadPromises]); + } else { + await Promise.race(subSequentPayloadPromises); + } if (isDone) { // a different call to next has exhausted all payloads diff --git a/packages/executor/src/execution/promiseForObject.ts b/packages/executor/src/execution/promiseForObject.ts index 4b9b512593c..fbfc6c8b9eb 100644 --- a/packages/executor/src/execution/promiseForObject.ts +++ b/packages/executor/src/execution/promiseForObject.ts @@ -1,3 +1,5 @@ +import { getAbortPromise } from '@graphql-tools/utils'; + type ResolvedObject = { [TKey in keyof TData]: TData[TKey] extends Promise ? TValue : TData[TKey]; }; @@ -14,15 +16,14 @@ export async function promiseForObject( signal?: AbortSignal, ): Promise> { const resolvedObject = Object.create(null); - await new Promise((resolve, reject) => { - signal?.addEventListener('abort', () => { - reject(signal.reason); - }); - Promise.all( - Object.entries(object as any).map(async ([key, value]) => { - resolvedObject[key] = await value; - }), - ).then(() => resolve(), reject); - }); - return resolvedObject; + const promises = Promise.all( + Object.entries(object as any).map(async ([key, value]) => { + resolvedObject[key] = await value; + }), + ); + if (signal) { + const abortPromise = getAbortPromise(signal); + return Promise.race([abortPromise, promises]).then(() => resolvedObject); + } + return promises.then(() => resolvedObject); } diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index f82d39bb56e..0db9594dded 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -58,3 +58,4 @@ export * from './getDirectiveExtensions.js'; export * from './map-maybe-promise.js'; export * from './fakePromise.js'; export * from './createDeferred.js'; +export * from './registerAbortSignalListener.js'; diff --git a/packages/utils/src/registerAbortSignalListener.ts b/packages/utils/src/registerAbortSignalListener.ts new file mode 100644 index 00000000000..d7a31121910 --- /dev/null +++ b/packages/utils/src/registerAbortSignalListener.ts @@ -0,0 +1,45 @@ +import { memoize1 } from './memoize.js'; + +// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected" +// on Node.js +const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) { + const listeners = new Set(); + signal.addEventListener( + 'abort', + e => { + for (const listener of listeners) { + listener(e); + } + }, + { once: true }, + ); + return listeners; +}); + +/** + * Register an AbortSignal handler for a signal. + * This helper function mainly exists to work around the + * "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit." + * warning occuring on Node.js + */ +export function registerAbortSignalListener(signal: AbortSignal, listener: VoidFunction) { + // If the signal is already aborted, call the listener immediately + if (signal.aborted) { + listener(); + return; + } + getListenersOfAbortSignal(signal).add(listener); +} + +export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) { + return new Promise((_resolve, reject) => { + // If the signal is already aborted, return a rejected promise + if (signal.aborted) { + reject(signal.reason); + return; + } + registerAbortSignalListener(signal, () => { + reject(signal.reason); + }); + }); +}); diff --git a/website/theme.config.tsx b/website/theme.config.tsx index bbfcbf7cab4..4a3e883ef68 100644 --- a/website/theme.config.tsx +++ b/website/theme.config.tsx @@ -29,5 +29,6 @@ export default defineConfig({ }, websiteName: 'GraphQL-Tools', description: PRODUCTS.TOOLS.title, + // @ts-expect-error - Typings are not updated logo: PRODUCTS.TOOLS.logo({ className: 'w-9' }), }); From 7cec9a36c8869567ca685def6970f80d29f8b7f5 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 24 Dec 2024 11:01:23 +0300 Subject: [PATCH 3/3] Update packages/executor/src/execution/__tests__/abort-signal.test.ts --- packages/executor/src/execution/__tests__/abort-signal.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/executor/src/execution/__tests__/abort-signal.test.ts b/packages/executor/src/execution/__tests__/abort-signal.test.ts index ae88226c67c..4d7c242fa0b 100644 --- a/packages/executor/src/execution/__tests__/abort-signal.test.ts +++ b/packages/executor/src/execution/__tests__/abort-signal.test.ts @@ -129,7 +129,6 @@ describe('Abort Signal', () => { controller.abort(); await expect($next).rejects.toMatchInlineSnapshot(`DOMException {}`); expect(aResolverGotInvoked).toEqual(false); - expect(controller.signal.addEventListener).toHaveBeenCalledTimes(1); }); it('should stop the serial mutation execution', async () => { let didInvokeFirstFn = false;