Skip to content

Commit

Permalink
fix: avoid "possible EventEmitter memory leak detected" warning (#6789)
Browse files Browse the repository at this point in the history
* fix: possible EventEmitter memory leak detected

* Single place for AbortSignal listeners -> #6789 (#6793)

* Single place for AbortSignal listeners

* Fix

* ..

* Lets go

* Small improvement

* Changeset for utils

* Update packages/executor/src/execution/__tests__/abort-signal.test.ts

---------

Co-authored-by: Arda TANRIKULU <ardatanrikulu@gmail.com>
  • Loading branch information
n1ru4l and ardatan authored Dec 24, 2024
1 parent d288b8b commit 2c70d27
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 63 deletions.
8 changes: 8 additions & 0 deletions .changeset/hip-bikes-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@graphql-tools/utils': minor
---

- New helper function `getAbortPromise` to get a promise rejected when `AbortSignal` is aborted
- New helper function `registerAbortSignalListener` to register a listener to abort a promise when `AbortSignal` is aborted

Instead of using `.addEventListener('abort', () => {/* ... */})`, we register a single listener to avoid warnings on Node.js like `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 abort listeners added. Use emitter.setMaxListeners() to increase limit`.
8 changes: 8 additions & 0 deletions .changeset/selfish-worms-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@graphql-tools/executor': patch
---

Surpress the "possible EventEmitter memory leak detected." warning occuring on Node.js when passing
a `AbortSignal` to `execute`.

Each execution will now only set up a single listener on the supplied `AbortSignal`. While the warning is harmless it can be misleading, which is the main motivation of this change.
22 changes: 11 additions & 11 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils';
import { normalizedExecutor } from '../normalizedExecutor';

describe('Abort Signal', () => {
// Always make sure that listener is registered once or never
let controller: AbortController;
let spy: jest.SpyInstance;
beforeEach(() => {
controller = new AbortController();
spy = jest.spyOn(controller.signal, 'addEventListener');
});
afterEach(() => {
expect(spy.mock.calls.length).toBeLessThanOrEqual(1);
});
it('should stop the subscription', async () => {
expect.assertions(2);
const controller = new AbortController();
expect.assertions(3);
let stopped = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down Expand Up @@ -64,7 +73,6 @@ describe('Abort Signal', () => {
expect(results).toEqual([0, 1, 2, 3, 4]);
});
it('pending subscription execution is canceled', async () => {
const controller = new AbortController();
const rootResolverGotInvokedD = createDeferred<void>();
const requestGotCancelledD = createDeferred<void>();
let aResolverGotInvoked = false;
Expand Down Expand Up @@ -123,8 +131,6 @@ describe('Abort Signal', () => {
expect(aResolverGotInvoked).toEqual(false);
});
it('should stop the serial mutation execution', async () => {
const controller = new AbortController();

let didInvokeFirstFn = false;
let didInvokeSecondFn = false;
let didInvokeThirdFn = false;
Expand Down Expand Up @@ -174,7 +180,6 @@ describe('Abort Signal', () => {
expect(didInvokeThirdFn).toBe(false);
});
it('should stop stream execution', async () => {
const controller = new AbortController();
let isAborted = false;

const schema = makeExecutableSchema({
Expand Down Expand Up @@ -223,7 +228,6 @@ describe('Abort Signal', () => {
expect(isAborted).toEqual(true);
});
it('stops pending stream execution for incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d = createDeferred<void>();
let isReturnInvoked = false;

Expand Down Expand Up @@ -285,7 +289,6 @@ describe('Abort Signal', () => {
expect(isReturnInvoked).toEqual(true);
});
it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d1 = createDeferred<void>();
const d2 = createDeferred<void>();

Expand Down Expand Up @@ -404,7 +407,6 @@ describe('Abort Signal', () => {
},
},
});
const controller = new AbortController();
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
Expand Down Expand Up @@ -443,7 +445,6 @@ describe('Abort Signal', () => {
expect(bResolverGotInvoked).toBe(false);
});
it('stops promise execution', async () => {
const controller = new AbortController();
const d = createDeferred<void>();

const schema = makeExecutableSchema({
Expand Down Expand Up @@ -474,7 +475,6 @@ describe('Abort Signal', () => {
await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
});
it('does not even try to execute if the signal is already aborted', async () => {
const controller = new AbortController();
let resolverGotInvoked = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down
78 changes: 37 additions & 41 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
collectFields,
createGraphQLError,
fakePromise,
getAbortPromise,
getArgumentValues,
getDefinedRootType,
GraphQLResolveInfo,
Expand All @@ -52,6 +53,7 @@ import {
Path,
pathToArray,
promiseReduce,
registerAbortSignalListener,
} from '@graphql-tools/utils';
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
Expand Down Expand Up @@ -290,9 +292,7 @@ export function execute<TData = any, TVariables = any, TContext = any>(
function executeImpl<TData = any, TVariables = any, TContext = any>(
exeContext: ExecutionContext<TVariables, TContext>,
): MaybePromise<SingularExecutionResult<TData> | IncrementalExecutionResults<TData>> {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

// Return a Promise that will eventually resolve to the data described by
// The "Response" section of the GraphQL specification.
Expand Down Expand Up @@ -322,9 +322,7 @@ function executeImpl<TData = any, TVariables = any, TContext = any>(
return initialResult;
},
(error: any) => {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

if (error.errors) {
exeContext.errors.push(...error.errors);
Expand Down Expand Up @@ -558,9 +556,7 @@ function executeFieldsSerially<TData>(
fields,
(results, [responseName, fieldNodes]) => {
const fieldPath = addPath(path, responseName, parentType.name);
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

return new ValueOrPromise(() =>
executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath),
Expand Down Expand Up @@ -595,9 +591,7 @@ function executeFields(

try {
for (const [responseName, fieldNodes] of fields) {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

const fieldPath = addPath(path, responseName, parentType.name);
const result = executeField(
Expand Down Expand Up @@ -958,13 +952,12 @@ async function completeAsyncIteratorValue(
iterator: AsyncIterator<unknown>,
asyncPayloadRecord?: AsyncPayloadRecord,
): Promise<ReadonlyArray<unknown>> {
exeContext.signal?.addEventListener(
'abort',
() => {
if (exeContext.signal && iterator.return) {
registerAbortSignalListener(exeContext.signal, () => {
iterator.return?.();
},
{ once: true },
);
});
}

const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
const stream = getStreamValues(exeContext, fieldNodes, path);
let containsPromise = false;
Expand Down Expand Up @@ -1758,15 +1751,22 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
);
}
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();
signal?.addEventListener('abort', () => {
asyncIterator.return?.();
});
return asyncIterator;
},
};
if (signal) {
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();

if (asyncIterator.return) {
registerAbortSignalListener(signal, () => {
asyncIterator.return?.();
});
}

return asyncIterator;
},
};
}
return result;
}

function executeDeferredFragment(
Expand Down Expand Up @@ -2084,26 +2084,22 @@ function yieldSubsequentPayloads(
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

const abortPromise = new Promise<void>((_, reject) => {
exeContext.signal?.addEventListener(
'abort',
() => {
isDone = true;
reject(exeContext.signal?.reason);
},
{ once: true },
);
});
const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;

async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
if (isDone) {
return { value: undefined, done: true };
}

await Promise.race([
abortPromise,
...Array.from(exeContext.subsequentPayloads).map(p => p.promise),
]);
const subSequentPayloadPromises = Array.from(exeContext.subsequentPayloads).map(
record => record.promise,
);

if (abortPromise) {
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
} else {
await Promise.race(subSequentPayloadPromises);
}

if (isDone) {
// a different call to next has exhausted all payloads
Expand Down
23 changes: 12 additions & 11 deletions packages/executor/src/execution/promiseForObject.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { getAbortPromise } from '@graphql-tools/utils';

type ResolvedObject<TData> = {
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
};
Expand All @@ -14,15 +16,14 @@ export async function promiseForObject<TData>(
signal?: AbortSignal,
): Promise<ResolvedObject<TData>> {
const resolvedObject = Object.create(null);
await new Promise<void>((resolve, reject) => {
signal?.addEventListener('abort', () => {
reject(signal.reason);
});
Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
).then(() => resolve(), reject);
});
return resolvedObject;
const promises = Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
);
if (signal) {
const abortPromise = getAbortPromise(signal);
return Promise.race([abortPromise, promises]).then(() => resolvedObject);
}
return promises.then(() => resolvedObject);
}
1 change: 1 addition & 0 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ export * from './getDirectiveExtensions.js';
export * from './map-maybe-promise.js';
export * from './fakePromise.js';
export * from './createDeferred.js';
export * from './registerAbortSignalListener.js';
45 changes: 45 additions & 0 deletions packages/utils/src/registerAbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { memoize1 } from './memoize.js';

// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
// on Node.js
const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) {
const listeners = new Set<EventListener>();
signal.addEventListener(
'abort',
e => {
for (const listener of listeners) {
listener(e);
}
},
{ once: true },
);
return listeners;
});

/**
* Register an AbortSignal handler for a signal.
* This helper function mainly exists to work around the
* "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit."
* warning occuring on Node.js
*/
export function registerAbortSignalListener(signal: AbortSignal, listener: VoidFunction) {
// If the signal is already aborted, call the listener immediately
if (signal.aborted) {
listener();
return;
}
getListenersOfAbortSignal(signal).add(listener);
}

export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
return new Promise<void>((_resolve, reject) => {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
reject(signal.reason);
return;
}
registerAbortSignalListener(signal, () => {
reject(signal.reason);
});
});
});

0 comments on commit 2c70d27

Please sign in to comment.