diff --git a/.changeset/hip-bikes-hunt.md b/.changeset/hip-bikes-hunt.md new file mode 100644 index 00000000000..b655f74e4a0 --- /dev/null +++ b/.changeset/hip-bikes-hunt.md @@ -0,0 +1,8 @@ +--- +'@graphql-tools/utils': minor +--- + +- New helper function `getAbortPromise` to get a promise rejected when `AbortSignal` is aborted +- New helper function `registerAbortSignalListener` to register a listener to abort a promise when `AbortSignal` is aborted + +Instead of using `.addEventListener('abort', () => {/* ... */})`, we register a single listener to avoid warnings on Node.js like `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 abort listeners added. Use emitter.setMaxListeners() to increase limit`. diff --git a/.changeset/selfish-worms-decide.md b/.changeset/selfish-worms-decide.md new file mode 100644 index 00000000000..920e3ebd73f --- /dev/null +++ b/.changeset/selfish-worms-decide.md @@ -0,0 +1,8 @@ +--- +'@graphql-tools/executor': patch +--- + +Surpress the "possible EventEmitter memory leak detected." warning occuring on Node.js when passing +a `AbortSignal` to `execute`. + +Each execution will now only set up a single listener on the supplied `AbortSignal`. While the warning is harmless it can be misleading, which is the main motivation of this change. diff --git a/packages/executor/src/execution/__tests__/abort-signal.test.ts b/packages/executor/src/execution/__tests__/abort-signal.test.ts index f3ea8ef66a2..4d7c242fa0b 100644 --- a/packages/executor/src/execution/__tests__/abort-signal.test.ts +++ b/packages/executor/src/execution/__tests__/abort-signal.test.ts @@ -7,9 +7,18 @@ import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils'; import { normalizedExecutor } from '../normalizedExecutor'; describe('Abort Signal', () => { + // Always make sure that listener is registered once or never + let controller: AbortController; + let spy: jest.SpyInstance; + beforeEach(() => { + controller = new AbortController(); + spy = jest.spyOn(controller.signal, 'addEventListener'); + }); + afterEach(() => { + expect(spy.mock.calls.length).toBeLessThanOrEqual(1); + }); it('should stop the subscription', async () => { - expect.assertions(2); - const controller = new AbortController(); + expect.assertions(3); let stopped = false; const schema = makeExecutableSchema({ typeDefs: /* GraphQL */ ` @@ -64,7 +73,6 @@ describe('Abort Signal', () => { expect(results).toEqual([0, 1, 2, 3, 4]); }); it('pending subscription execution is canceled', async () => { - const controller = new AbortController(); const rootResolverGotInvokedD = createDeferred(); const requestGotCancelledD = createDeferred(); let aResolverGotInvoked = false; @@ -123,8 +131,6 @@ describe('Abort Signal', () => { expect(aResolverGotInvoked).toEqual(false); }); it('should stop the serial mutation execution', async () => { - const controller = new AbortController(); - let didInvokeFirstFn = false; let didInvokeSecondFn = false; let didInvokeThirdFn = false; @@ -174,7 +180,6 @@ describe('Abort Signal', () => { expect(didInvokeThirdFn).toBe(false); }); it('should stop stream execution', async () => { - const controller = new AbortController(); let isAborted = false; const schema = makeExecutableSchema({ @@ -223,7 +228,6 @@ describe('Abort Signal', () => { expect(isAborted).toEqual(true); }); it('stops pending stream execution for incremental delivery (@stream)', async () => { - const controller = new AbortController(); const d = createDeferred(); let isReturnInvoked = false; @@ -285,7 +289,6 @@ describe('Abort Signal', () => { expect(isReturnInvoked).toEqual(true); }); it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => { - const controller = new AbortController(); const d1 = createDeferred(); const d2 = createDeferred(); @@ -404,7 +407,6 @@ describe('Abort Signal', () => { }, }, }); - const controller = new AbortController(); const result = await normalizedExecutor({ schema, document: parse(/* GraphQL */ ` @@ -443,7 +445,6 @@ describe('Abort Signal', () => { expect(bResolverGotInvoked).toBe(false); }); it('stops promise execution', async () => { - const controller = new AbortController(); const d = createDeferred(); const schema = makeExecutableSchema({ @@ -474,7 +475,6 @@ describe('Abort Signal', () => { await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`); }); it('does not even try to execute if the signal is already aborted', async () => { - const controller = new AbortController(); let resolverGotInvoked = false; const schema = makeExecutableSchema({ typeDefs: /* GraphQL */ ` diff --git a/packages/executor/src/execution/execute.ts b/packages/executor/src/execution/execute.ts index 6915f0b22ab..3053d3b7079 100644 --- a/packages/executor/src/execution/execute.ts +++ b/packages/executor/src/execution/execute.ts @@ -35,6 +35,7 @@ import { collectFields, createGraphQLError, fakePromise, + getAbortPromise, getArgumentValues, getDefinedRootType, GraphQLResolveInfo, @@ -52,6 +53,7 @@ import { Path, pathToArray, promiseReduce, + registerAbortSignalListener, } from '@graphql-tools/utils'; import { TypedDocumentNode } from '@graphql-typed-document-node/core'; import { DisposableSymbols } from '@whatwg-node/disposablestack'; @@ -290,9 +292,7 @@ export function execute( function executeImpl( exeContext: ExecutionContext, ): MaybePromise | IncrementalExecutionResults> { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); // Return a Promise that will eventually resolve to the data described by // The "Response" section of the GraphQL specification. @@ -322,9 +322,7 @@ function executeImpl( return initialResult; }, (error: any) => { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); if (error.errors) { exeContext.errors.push(...error.errors); @@ -558,9 +556,7 @@ function executeFieldsSerially( fields, (results, [responseName, fieldNodes]) => { const fieldPath = addPath(path, responseName, parentType.name); - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); return new ValueOrPromise(() => executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath), @@ -595,9 +591,7 @@ function executeFields( try { for (const [responseName, fieldNodes] of fields) { - if (exeContext.signal?.aborted) { - throw exeContext.signal.reason; - } + exeContext.signal?.throwIfAborted(); const fieldPath = addPath(path, responseName, parentType.name); const result = executeField( @@ -958,13 +952,12 @@ async function completeAsyncIteratorValue( iterator: AsyncIterator, asyncPayloadRecord?: AsyncPayloadRecord, ): Promise> { - exeContext.signal?.addEventListener( - 'abort', - () => { + if (exeContext.signal && iterator.return) { + registerAbortSignalListener(exeContext.signal, () => { iterator.return?.(); - }, - { once: true }, - ); + }); + } + const errors = asyncPayloadRecord?.errors ?? exeContext.errors; const stream = getStreamValues(exeContext, fieldNodes, path); let containsPromise = false; @@ -1758,15 +1751,22 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable 'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`, ); } - return { - [Symbol.asyncIterator]() { - const asyncIterator = result[Symbol.asyncIterator](); - signal?.addEventListener('abort', () => { - asyncIterator.return?.(); - }); - return asyncIterator; - }, - }; + if (signal) { + return { + [Symbol.asyncIterator]() { + const asyncIterator = result[Symbol.asyncIterator](); + + if (asyncIterator.return) { + registerAbortSignalListener(signal, () => { + asyncIterator.return?.(); + }); + } + + return asyncIterator; + }, + }; + } + return result; } function executeDeferredFragment( @@ -2084,26 +2084,22 @@ function yieldSubsequentPayloads( ): AsyncGenerator { let isDone = false; - const abortPromise = new Promise((_, reject) => { - exeContext.signal?.addEventListener( - 'abort', - () => { - isDone = true; - reject(exeContext.signal?.reason); - }, - { once: true }, - ); - }); + const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined; async function next(): Promise> { if (isDone) { return { value: undefined, done: true }; } - await Promise.race([ - abortPromise, - ...Array.from(exeContext.subsequentPayloads).map(p => p.promise), - ]); + const subSequentPayloadPromises = Array.from(exeContext.subsequentPayloads).map( + record => record.promise, + ); + + if (abortPromise) { + await Promise.race([abortPromise, ...subSequentPayloadPromises]); + } else { + await Promise.race(subSequentPayloadPromises); + } if (isDone) { // a different call to next has exhausted all payloads diff --git a/packages/executor/src/execution/promiseForObject.ts b/packages/executor/src/execution/promiseForObject.ts index 4b9b512593c..fbfc6c8b9eb 100644 --- a/packages/executor/src/execution/promiseForObject.ts +++ b/packages/executor/src/execution/promiseForObject.ts @@ -1,3 +1,5 @@ +import { getAbortPromise } from '@graphql-tools/utils'; + type ResolvedObject = { [TKey in keyof TData]: TData[TKey] extends Promise ? TValue : TData[TKey]; }; @@ -14,15 +16,14 @@ export async function promiseForObject( signal?: AbortSignal, ): Promise> { const resolvedObject = Object.create(null); - await new Promise((resolve, reject) => { - signal?.addEventListener('abort', () => { - reject(signal.reason); - }); - Promise.all( - Object.entries(object as any).map(async ([key, value]) => { - resolvedObject[key] = await value; - }), - ).then(() => resolve(), reject); - }); - return resolvedObject; + const promises = Promise.all( + Object.entries(object as any).map(async ([key, value]) => { + resolvedObject[key] = await value; + }), + ); + if (signal) { + const abortPromise = getAbortPromise(signal); + return Promise.race([abortPromise, promises]).then(() => resolvedObject); + } + return promises.then(() => resolvedObject); } diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index f82d39bb56e..0db9594dded 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -58,3 +58,4 @@ export * from './getDirectiveExtensions.js'; export * from './map-maybe-promise.js'; export * from './fakePromise.js'; export * from './createDeferred.js'; +export * from './registerAbortSignalListener.js'; diff --git a/packages/utils/src/registerAbortSignalListener.ts b/packages/utils/src/registerAbortSignalListener.ts new file mode 100644 index 00000000000..d7a31121910 --- /dev/null +++ b/packages/utils/src/registerAbortSignalListener.ts @@ -0,0 +1,45 @@ +import { memoize1 } from './memoize.js'; + +// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected" +// on Node.js +const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) { + const listeners = new Set(); + signal.addEventListener( + 'abort', + e => { + for (const listener of listeners) { + listener(e); + } + }, + { once: true }, + ); + return listeners; +}); + +/** + * Register an AbortSignal handler for a signal. + * This helper function mainly exists to work around the + * "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit." + * warning occuring on Node.js + */ +export function registerAbortSignalListener(signal: AbortSignal, listener: VoidFunction) { + // If the signal is already aborted, call the listener immediately + if (signal.aborted) { + listener(); + return; + } + getListenersOfAbortSignal(signal).add(listener); +} + +export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) { + return new Promise((_resolve, reject) => { + // If the signal is already aborted, return a rejected promise + if (signal.aborted) { + reject(signal.reason); + return; + } + registerAbortSignalListener(signal, () => { + reject(signal.reason); + }); + }); +});