Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid "possible EventEmitter memory leak detected" warning #6789

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/hip-bikes-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@graphql-tools/utils': minor
---

- New helper function `getAbortPromise` to get a promise rejected when `AbortSignal` is aborted
- New helper function `registerAbortSignalListener` to register a listener to abort a promise when `AbortSignal` is aborted

Instead of using `.addEventListener('abort', () => {/* ... */})`, we register a single listener to avoid warnings on Node.js like `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 abort listeners added. Use emitter.setMaxListeners() to increase limit`.
8 changes: 8 additions & 0 deletions .changeset/selfish-worms-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@graphql-tools/executor': patch
---

Surpress the "possible EventEmitter memory leak detected." warning occuring on Node.js when passing
a `AbortSignal` to `execute`.

Each execution will now only set up a single listener on the supplied `AbortSignal`. While the warning is harmless it can be misleading, which is the main motivation of this change.
22 changes: 11 additions & 11 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils';
import { normalizedExecutor } from '../normalizedExecutor';

describe('Abort Signal', () => {
// Always make sure that listener is registered once or never
let controller: AbortController;
let spy: jest.SpyInstance;
beforeEach(() => {
controller = new AbortController();
spy = jest.spyOn(controller.signal, 'addEventListener');
});
afterEach(() => {
expect(spy.mock.calls.length).toBeLessThanOrEqual(1);
});
it('should stop the subscription', async () => {
expect.assertions(2);
const controller = new AbortController();
expect.assertions(3);
let stopped = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down Expand Up @@ -64,7 +73,6 @@ describe('Abort Signal', () => {
expect(results).toEqual([0, 1, 2, 3, 4]);
});
it('pending subscription execution is canceled', async () => {
const controller = new AbortController();
const rootResolverGotInvokedD = createDeferred<void>();
const requestGotCancelledD = createDeferred<void>();
let aResolverGotInvoked = false;
Expand Down Expand Up @@ -123,8 +131,6 @@ describe('Abort Signal', () => {
expect(aResolverGotInvoked).toEqual(false);
});
it('should stop the serial mutation execution', async () => {
const controller = new AbortController();

let didInvokeFirstFn = false;
let didInvokeSecondFn = false;
let didInvokeThirdFn = false;
Expand Down Expand Up @@ -174,7 +180,6 @@ describe('Abort Signal', () => {
expect(didInvokeThirdFn).toBe(false);
});
it('should stop stream execution', async () => {
const controller = new AbortController();
let isAborted = false;

const schema = makeExecutableSchema({
Expand Down Expand Up @@ -223,7 +228,6 @@ describe('Abort Signal', () => {
expect(isAborted).toEqual(true);
});
it('stops pending stream execution for incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d = createDeferred<void>();
let isReturnInvoked = false;

Expand Down Expand Up @@ -285,7 +289,6 @@ describe('Abort Signal', () => {
expect(isReturnInvoked).toEqual(true);
});
it('stops pending stream execution for parallel sources incremental delivery (@stream)', async () => {
const controller = new AbortController();
const d1 = createDeferred<void>();
const d2 = createDeferred<void>();

Expand Down Expand Up @@ -404,7 +407,6 @@ describe('Abort Signal', () => {
},
},
});
const controller = new AbortController();
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
Expand Down Expand Up @@ -443,7 +445,6 @@ describe('Abort Signal', () => {
expect(bResolverGotInvoked).toBe(false);
});
it('stops promise execution', async () => {
const controller = new AbortController();
const d = createDeferred<void>();

const schema = makeExecutableSchema({
Expand Down Expand Up @@ -474,7 +475,6 @@ describe('Abort Signal', () => {
await expect(result$).rejects.toMatchInlineSnapshot(`DOMException {}`);
});
it('does not even try to execute if the signal is already aborted', async () => {
const controller = new AbortController();
let resolverGotInvoked = false;
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down
78 changes: 37 additions & 41 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
collectFields,
createGraphQLError,
fakePromise,
getAbortPromise,
getArgumentValues,
getDefinedRootType,
GraphQLResolveInfo,
Expand All @@ -52,6 +53,7 @@ import {
Path,
pathToArray,
promiseReduce,
registerAbortSignalListener,
} from '@graphql-tools/utils';
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
Expand Down Expand Up @@ -290,9 +292,7 @@ export function execute<TData = any, TVariables = any, TContext = any>(
function executeImpl<TData = any, TVariables = any, TContext = any>(
exeContext: ExecutionContext<TVariables, TContext>,
): MaybePromise<SingularExecutionResult<TData> | IncrementalExecutionResults<TData>> {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

// Return a Promise that will eventually resolve to the data described by
// The "Response" section of the GraphQL specification.
Expand Down Expand Up @@ -322,9 +322,7 @@ function executeImpl<TData = any, TVariables = any, TContext = any>(
return initialResult;
},
(error: any) => {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

if (error.errors) {
exeContext.errors.push(...error.errors);
Expand Down Expand Up @@ -558,9 +556,7 @@ function executeFieldsSerially<TData>(
fields,
(results, [responseName, fieldNodes]) => {
const fieldPath = addPath(path, responseName, parentType.name);
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

return new ValueOrPromise(() =>
executeField(exeContext, parentType, sourceValue, fieldNodes, fieldPath),
Expand Down Expand Up @@ -595,9 +591,7 @@ function executeFields(

try {
for (const [responseName, fieldNodes] of fields) {
if (exeContext.signal?.aborted) {
throw exeContext.signal.reason;
}
exeContext.signal?.throwIfAborted();

const fieldPath = addPath(path, responseName, parentType.name);
const result = executeField(
Expand Down Expand Up @@ -958,13 +952,12 @@ async function completeAsyncIteratorValue(
iterator: AsyncIterator<unknown>,
asyncPayloadRecord?: AsyncPayloadRecord,
): Promise<ReadonlyArray<unknown>> {
exeContext.signal?.addEventListener(
'abort',
() => {
if (exeContext.signal && iterator.return) {
registerAbortSignalListener(exeContext.signal, () => {
iterator.return?.();
},
{ once: true },
);
});
}

const errors = asyncPayloadRecord?.errors ?? exeContext.errors;
const stream = getStreamValues(exeContext, fieldNodes, path);
let containsPromise = false;
Expand Down Expand Up @@ -1758,15 +1751,22 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
);
}
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();
signal?.addEventListener('abort', () => {
asyncIterator.return?.();
});
return asyncIterator;
},
};
if (signal) {
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();

if (asyncIterator.return) {
registerAbortSignalListener(signal, () => {
asyncIterator.return?.();
});
}

return asyncIterator;
},
};
}
return result;
}

function executeDeferredFragment(
Expand Down Expand Up @@ -2084,26 +2084,22 @@ function yieldSubsequentPayloads(
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

const abortPromise = new Promise<void>((_, reject) => {
exeContext.signal?.addEventListener(
'abort',
() => {
isDone = true;
reject(exeContext.signal?.reason);
},
{ once: true },
);
});
const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;

async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
if (isDone) {
return { value: undefined, done: true };
}

await Promise.race([
abortPromise,
...Array.from(exeContext.subsequentPayloads).map(p => p.promise),
]);
const subSequentPayloadPromises = Array.from(exeContext.subsequentPayloads).map(
record => record.promise,
);

if (abortPromise) {
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
} else {
await Promise.race(subSequentPayloadPromises);
}

if (isDone) {
// a different call to next has exhausted all payloads
Expand Down
23 changes: 12 additions & 11 deletions packages/executor/src/execution/promiseForObject.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { getAbortPromise } from '@graphql-tools/utils';

type ResolvedObject<TData> = {
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
};
Expand All @@ -14,15 +16,14 @@ export async function promiseForObject<TData>(
signal?: AbortSignal,
): Promise<ResolvedObject<TData>> {
const resolvedObject = Object.create(null);
await new Promise<void>((resolve, reject) => {
signal?.addEventListener('abort', () => {
reject(signal.reason);
});
Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
).then(() => resolve(), reject);
});
return resolvedObject;
const promises = Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
);
if (signal) {
const abortPromise = getAbortPromise(signal);
return Promise.race([abortPromise, promises]).then(() => resolvedObject);
}
return promises.then(() => resolvedObject);
}
1 change: 1 addition & 0 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ export * from './getDirectiveExtensions.js';
export * from './map-maybe-promise.js';
export * from './fakePromise.js';
export * from './createDeferred.js';
export * from './registerAbortSignalListener.js';
45 changes: 45 additions & 0 deletions packages/utils/src/registerAbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { memoize1 } from './memoize.js';

// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
// on Node.js
const getListenersOfAbortSignal = memoize1(function getListenersOfAbortSignal(signal: AbortSignal) {
const listeners = new Set<EventListener>();
signal.addEventListener(
'abort',
e => {
for (const listener of listeners) {
listener(e);
}
},
{ once: true },
);
return listeners;
});

/**
* Register an AbortSignal handler for a signal.
* This helper function mainly exists to work around the
* "possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit."
* warning occuring on Node.js
*/
export function registerAbortSignalListener(signal: AbortSignal, listener: VoidFunction) {
// If the signal is already aborted, call the listener immediately
if (signal.aborted) {
listener();
return;
}
getListenersOfAbortSignal(signal).add(listener);
}

export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
return new Promise<void>((_resolve, reject) => {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
reject(signal.reason);
return;
}
registerAbortSignalListener(signal, () => {
reject(signal.reason);
});
});
});
1 change: 1 addition & 0 deletions website/theme.config.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ export default defineConfig({
},
websiteName: 'GraphQL-Tools',
description: PRODUCTS.TOOLS.title,
// @ts-expect-error - Typings are not updated
logo: PRODUCTS.TOOLS.logo({ className: 'w-9' }),
});
Loading