Skip to content

Commit

Permalink
enhance: use native AbortSignal and AbortController APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Feb 5, 2025
1 parent 1b4dbed commit 9ad895a
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 75 deletions.
7 changes: 7 additions & 0 deletions .changeset/fair-queens-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-mesh/transport-http-callback': patch
'@graphql-tools/executor-http': patch
'@graphql-hive/gateway-runtime': patch
---

Use native AbortSignal, AbortController APIs instead of custom ones
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
"@opentelemetry/otlp-exporter-base": "patch:@opentelemetry/otlp-exporter-base@npm%3A0.56.0#~/.yarn/patches/@opentelemetry-otlp-exporter-base-npm-0.56.0-ba3dc5f5c5.patch",
"@opentelemetry/resources": "patch:@opentelemetry/resources@npm%3A1.29.0#~/.yarn/patches/@opentelemetry-resources-npm-1.29.0-112f89f0c5.patch",
"@vitest/snapshot@npm:3.0.5": "patch:@vitest/snapshot@npm%3A3.0.5#~/.yarn/patches/@vitest-snapshot-npm-3.0.0-d5b381862b.patch",
"@whatwg-node/node-fetch": "0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8",
"@whatwg-node/server": "0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8",
"cross-spawn": "7.0.6",
"esbuild": "0.24.2",
"graphql": "16.10.0",
Expand Down
32 changes: 6 additions & 26 deletions packages/executors/http/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any';
import {
defaultPrintFn,
SerializedExecutionRequest,
Expand Down Expand Up @@ -132,24 +131,6 @@ export interface HTTPExecutorOptions {

export type HeadersConfig = Record<string, string>;

// To prevent event listener warnings
function createSignalWrapper(signal: AbortSignal): AbortSignal {
const listeners = new Set<EventListener>();
signal.onabort = (event) => {
for (const listener of listeners) {
listener(event);
}
};
return Object.assign(signal, {
addEventListener(_type: 'abort', listener: EventListener) {
listeners.add(listener);
},
removeEventListener(_type: 'abort', listener: EventListener) {
listeners.delete(listener);
},
});
}

export function buildHTTPExecutor(
options?: Omit<HTTPExecutorOptions, 'fetch'> & {
fetch: SyncFetchFn;
Expand Down Expand Up @@ -177,13 +158,12 @@ export function buildHTTPExecutor(
): DisposableExecutor<any, HTTPExecutorOptions> {
const printFn = options?.print ?? defaultPrintFn;
const disposeCtrl = new AbortController();
const sharedSignal = createSignalWrapper(disposeCtrl.signal);
const baseExecutor = (
request: ExecutionRequest<any, any, any, HTTPExecutorOptions>,
excludeQuery?: boolean,
) => {
if (sharedSignal.aborted) {
return createResultForAbort(sharedSignal.reason);
if (disposeCtrl.signal.aborted) {
return createResultForAbort(disposeCtrl.signal.reason);
}
const fetchFn = request.extensions?.fetch ?? options?.fetch ?? defaultFetch;
let method = request.extensions?.method || options?.method;
Expand Down Expand Up @@ -230,7 +210,7 @@ export function buildHTTPExecutor(
request.extensions = restExtensions;
}

const signals = [sharedSignal];
const signals = [disposeCtrl.signal];
const signalFromRequest = request.signal || request.info?.signal;
if (signalFromRequest) {
if (signalFromRequest.aborted) {
Expand All @@ -242,7 +222,7 @@ export function buildHTTPExecutor(
signals.push(AbortSignal.timeout(options.timeout));
}

const signal = abortSignalAny(signals);
const signal = AbortSignal.any(signals);

const upstreamErrorExtensions: UpstreamErrorExtensions = {
request: {
Expand Down Expand Up @@ -489,8 +469,8 @@ export function buildHTTPExecutor(
function retryAttempt():
| PromiseLike<ExecutionResult<any>>
| ExecutionResult<any> {
if (sharedSignal.aborted) {
return createResultForAbort(sharedSignal.reason);
if (disposeCtrl.signal.aborted) {
return createResultForAbort(disposeCtrl.signal.reason);
}
attempt++;
if (attempt > options!.retry!) {
Expand Down
24 changes: 6 additions & 18 deletions packages/runtime/src/plugins/useUpstreamCancel.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import {
abortSignalAny,
isAbortSignalFromAny,
} from '@graphql-hive/gateway-abort-signal-any';
import { GraphQLResolveInfo } from '@graphql-tools/utils';
import type { GatewayPlugin } from '../types';

Expand All @@ -21,14 +17,10 @@ export function useUpstreamCancel(): GatewayPlugin {
if (signalInInfo) {
signals.push(signalInInfo);
}
if (isAbortSignalFromAny(options.signal)) {
options.signal.addSignals(signals);
} else {
if (options.signal) {
signals.push(options.signal);
}
options.signal = abortSignalAny(signals);
if (options.signal) {
signals.push(options.signal);
}
options.signal = AbortSignal.any(signals);
},
onSubgraphExecute({ executionRequest }) {
const signals: AbortSignal[] = [];
Expand All @@ -38,14 +30,10 @@ export function useUpstreamCancel(): GatewayPlugin {
if (executionRequest.context?.request?.signal) {
signals.push(executionRequest.context.request.signal);
}
if (isAbortSignalFromAny(executionRequest.signal)) {
executionRequest.signal.addSignals(signals);
} else {
if (executionRequest.signal) {
signals.push(executionRequest.signal);
}
executionRequest.signal = abortSignalAny(signals);
if (executionRequest.signal) {
signals.push(executionRequest.signal);
}
executionRequest.signal = AbortSignal.any(signals);
},
};
}
44 changes: 26 additions & 18 deletions packages/runtime/src/plugins/useUpstreamTimeout.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any';
import { subgraphNameByExecutionRequest } from '@graphql-mesh/fusion-runtime';
import { UpstreamErrorExtensions } from '@graphql-mesh/transport-common';
import { getHeadersObj } from '@graphql-mesh/utils';
import {
createDeferred,
createGraphQLError,
ExecutionRequest,
ExecutionResult,
getAbortPromise,
isAsyncIterable,
MaybeAsyncIterable,
MaybePromise,
Expand Down Expand Up @@ -54,21 +53,28 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
// Comment the line above
// And uncomment the line below to see that statement leaks specificaly
// timeoutSignal = { addEventListener() {} } as AbortSignal;
timeoutSignalsByExecutionRequest.set(
executionRequest,
timeoutSignal,
);
}
timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal);
const timeout$ = getAbortPromise(timeoutSignal);
let finalSignal: AbortSignal | undefined = timeoutSignal;
const signals = new Set<AbortSignal>();
signals.add(timeoutSignal);
const timeoutDeferred = createDeferred<void>();
function rejectDeferred() {
timeoutDeferred.reject(timeoutSignal?.reason);
}
timeoutSignal.addEventListener('abort', rejectDeferred, {
once: true,
});
const signals: AbortSignal[] = [];
signals.push(timeoutSignal);
if (executionRequest.signal) {
signals.add(executionRequest.signal);
finalSignal = abortSignalAny(signals);
signals.push(executionRequest.signal);
}
return Promise.race([
timeout$,
timeoutDeferred.promise,
executor({
...executionRequest,
signal: finalSignal,
signal: AbortSignal.any(signals),
}),
])
.then((result) => {
Expand Down Expand Up @@ -99,6 +105,8 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
throw e;
})
.finally(() => {
timeoutDeferred.resolve();
timeoutSignal.removeEventListener('abort', rejectDeferred);
// Remove from the map after used so we don't see it again
errorExtensionsByExecRequest.delete(executionRequest);
timeoutSignalsByExecutionRequest.delete(executionRequest);
Expand Down Expand Up @@ -131,15 +139,15 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
} else {
timeoutSignal = AbortSignal.timeout(timeout);
}
const signals = new Set<AbortSignal>();
signals.add(timeoutSignal);
const signals: AbortSignal[] = [];
signals.push(timeoutSignal);
if (options.signal) {
signals.add(options.signal);
setOptions({
...options,
signal: abortSignalAny(signals),
});
signals.push(options.signal);
}
setOptions({
...options,
signal: AbortSignal.any(signals),
});
}
}
if (executionRequest) {
Expand Down
4 changes: 1 addition & 3 deletions packages/runtime/tests/upstream-timeout.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ describe('Upstream Timeout', () => {
throw new Error('Unexpected subgraph');
},
});
setTimeout(() => {
greetingsDeferred.resolve('Hello, World!');
}, 1500);
const res = await gateway.fetch('http://localhost:4000/graphql', {
method: 'POST',
headers: {
Expand Down Expand Up @@ -75,6 +72,7 @@ describe('Upstream Timeout', () => {
}),
],
});
greetingsDeferred.resolve('Hello, World!');
});
it('issue #303 - does not leak when it does not time out', async () => {
const upstreamSchema = createSchema({
Expand Down
3 changes: 1 addition & 2 deletions packages/transports/http-callback/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { process } from '@graphql-mesh/cross-helpers';
import { getInterpolatedHeadersFactory } from '@graphql-mesh/string-interpolation';
import {
abortSignalAny,
type DisposableExecutor,
type Transport,
} from '@graphql-mesh/transport-common';
Expand Down Expand Up @@ -158,7 +157,7 @@ export default {
}
let signal = executionRequest.signal || executionRequest.info?.signal;
if (signal) {
signal = abortSignalAny([reqAbortCtrl.signal, signal]);
signal = AbortSignal.any([reqAbortCtrl.signal, signal]);
}
const subFetchCall$ = mapMaybePromise(
fetch(
Expand Down
16 changes: 8 additions & 8 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7013,14 +7013,14 @@ __metadata:
languageName: node
linkType: hard

"@whatwg-node/node-fetch@npm:^0.7.1, @whatwg-node/node-fetch@npm:^0.7.7":
version: 0.7.7
resolution: "@whatwg-node/node-fetch@npm:0.7.7"
"@whatwg-node/node-fetch@npm:0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8":
version: 0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8
resolution: "@whatwg-node/node-fetch@npm:0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8"
dependencies:
"@whatwg-node/disposablestack": "npm:^0.0.5"
busboy: "npm:^1.6.0"
tslib: "npm:^2.6.3"
checksum: 10c0/f61c45f256363f1c98ddcbcfc9265c8a98a64d09461a19ce32bcf76ab38619c7d7ee52ee7abfe80e49ddc7d6336e85ee481552294146ae934fca453feb970d23
checksum: 10c0/ceac8adad72cfbc98fe6b16265defaaa7b31bea8853663b230076b81bd473ee69ae5deb94844dc9ac57842a862c38634f802d3f77231b4f5a1582f0536941122
languageName: node
linkType: hard

Expand All @@ -7036,14 +7036,14 @@ __metadata:
languageName: node
linkType: hard

"@whatwg-node/server@npm:^0.9.55, @whatwg-node/server@npm:^0.9.60, @whatwg-node/server@npm:^0.9.64":
version: 0.9.65
resolution: "@whatwg-node/server@npm:0.9.65"
"@whatwg-node/server@npm:0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8":
version: 0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8
resolution: "@whatwg-node/server@npm:0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8"
dependencies:
"@whatwg-node/disposablestack": "npm:^0.0.5"
"@whatwg-node/fetch": "npm:^0.10.0"
tslib: "npm:^2.6.3"
checksum: 10c0/f6fde2995c28223278484432b6107908d3bb917e76efb401b132df44ec45f140d3ef97db6ad03d0d197133036f85fbdb9274f4ed75363594b0469391c178bbfb
checksum: 10c0/97992d4070541692a184f8722ae1fc908bc9737cfefffeb2f14133f162c0ee95d373bcf0bef6469a8a6a68f26ae1725b3b141a2fd3a3e40c47de381c00a768e2
languageName: node
linkType: hard

Expand Down

0 comments on commit 9ad895a

Please sign in to comment.