Skip to content

Commit

Permalink
fix(executor-graphql-ws): respect the given lazy and `lazyCloseTime…
Browse files Browse the repository at this point in the history
…out` options (#591)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
ardatan and github-actions[bot] authored Feb 4, 2025
1 parent a2b6fe9 commit 7d42160
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 65 deletions.
7 changes: 7 additions & 0 deletions .changeset/@graphql-mesh_transport-ws-591-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-mesh/transport-ws': patch
---

dependencies updates:

- Removed dependency [`isomorphic-ws@^5.0.0` ↗︎](https://www.npmjs.com/package/isomorphic-ws/v/5.0.0) (from `dependencies`)
5 changes: 5 additions & 0 deletions .changeset/twelve-ghosts-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-mesh/transport-ws': patch
---

Avoid having an extra Client instantiation in the transport, and use the one in the executor
5 changes: 5 additions & 0 deletions .changeset/twenty-brooms-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-tools/executor-graphql-ws': patch
---

Fix the regression preventing users from passing custom `lazy` and `lazyCloseTimeout` options
45 changes: 40 additions & 5 deletions packages/executors/graphql-ws/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@ import {
registerAbortSignalListener,
} from '@graphql-tools/utils';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { print } from 'graphql';
import { Client, createClient } from 'graphql-ws';
import type { DocumentNode } from 'graphql';
import {
createClient,
EventClosedListener,
EventConnectedListener,
EventConnectingListener,
EventErrorListener,
EventMessageListener,
EventOpenedListener,
EventPingListener,
EventPongListener,
type Client,
} from 'graphql-ws';
import WebSocket from 'isomorphic-ws';

export interface GraphQLWSExecutorOptions {
onClient?: (client: Client) => void;
print?: typeof print;
print?(doc: DocumentNode): string;
/** The URL of the WebSocket server to connect to. */
url: string;
/**
Expand Down Expand Up @@ -45,6 +55,29 @@ export interface GraphQLWSExecutorOptions {
* @default 0
*/
lazyCloseTimeout?: number;

/**
* Do not use this option unless you know what you are doing.
* @internal
*/
on?:
| Partial<{
error: EventErrorListener;
message: EventMessageListener;
connecting: EventConnectingListener;
opened: EventOpenedListener;
connected: EventConnectedListener;
ping: EventPingListener;
pong: EventPongListener;
closed: EventClosedListener;
}>
| undefined;

/**
* Do not use this option unless you know what you are doing.
* @internal
*/
onClient?: (client: Client) => void;
}

function isClient(client: Client | GraphQLWSExecutorOptions): client is Client {
Expand Down Expand Up @@ -76,14 +109,16 @@ export function buildGraphQLWSExecutor(
graphqlWSClient = createClient({
url: clientOptionsOrClient.url,
webSocketImpl,
lazy: true,
lazy: clientOptionsOrClient.lazy !== false,
lazyCloseTimeout: clientOptionsOrClient.lazyCloseTimeout || 0,
connectionParams: () => {
const optionsConnectionParams =
(typeof clientOptionsOrClient.connectionParams === 'function'
? clientOptionsOrClient.connectionParams()
: clientOptionsOrClient.connectionParams) || {};
return Object.assign(optionsConnectionParams, executorConnectionParams);
},
on: clientOptionsOrClient.on,
});
if (clientOptionsOrClient.onClient) {
clientOptionsOrClient.onClient(graphqlWSClient);
Expand Down
1 change: 0 additions & 1 deletion packages/transports/ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
"@graphql-tools/executor-graphql-ws": "workspace:^",
"@graphql-tools/utils": "^10.7.0",
"graphql-ws": "^6.0.3",
"isomorphic-ws": "^5.0.0",
"tslib": "^2.8.1",
"ws": "^8.18.0"
},
Expand Down
91 changes: 42 additions & 49 deletions packages/transports/ws/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import {
makeAsyncDisposable,
} from '@graphql-mesh/utils';
import { buildGraphQLWSExecutor } from '@graphql-tools/executor-graphql-ws';
import { DisposableAsyncExecutor } from '@graphql-tools/utils';
import { Client, createClient } from 'graphql-ws';
import WebSocket from 'isomorphic-ws';
import type { Client } from 'graphql-ws';

function switchProtocols(url: string) {
if (url.startsWith('https://')) {
Expand All @@ -36,9 +34,11 @@ export interface WSTransportOptions {
export default {
getSubgraphExecutor(
{ transportEntry, logger },
buildExecutor: (
client: Client,
) => DisposableAsyncExecutor = buildGraphQLWSExecutor,
/**
* Do not use this option unless you know what you are doing.
* @internal
*/
onClient?: (client: Client) => void,
) {
const wsExecutorMap = new Map<string, DisposableExecutor>();
if (!transportEntry.location) {
Expand Down Expand Up @@ -77,50 +77,43 @@ export default {
let wsExecutor = wsExecutorMap.get(hash);
if (!wsExecutor) {
const executorLogger = logger?.child('GraphQL WS').child(hash);
wsExecutor = buildExecutor(
createClient({
webSocketImpl: headers
? class WebSocketWithHeaders extends WebSocket {
constructor(url: string, protocol: string) {
super(url, protocol, { headers });
}
}
: WebSocket,
url: wsUrl,
lazy: true,
lazyCloseTimeout: 3_000,
...transportEntry.options,
connectionParams,
on: {
connecting(isRetry) {
executorLogger?.debug('connecting', { isRetry });
},
opened(socket) {
executorLogger?.debug('opened', { socket });
},
connected(socket, payload) {
executorLogger?.debug('connected', { socket, payload });
},
ping(received, payload) {
executorLogger?.debug('ping', { received, payload });
},
pong(received, payload) {
executorLogger?.debug('pong', { received, payload });
},
message(message) {
executorLogger?.debug('message', { message });
},
closed(event) {
executorLogger?.debug('closed', { event });
// no subscriptions and the lazy close timeout has passed - remove the client
wsExecutorMap.delete(hash);
},
error(error) {
executorLogger?.debug('error', { error });
},
wsExecutor = buildGraphQLWSExecutor({
headers,
url: wsUrl,
lazy: true,
lazyCloseTimeout: 3_000,
...transportEntry.options,
connectionParams,
on: {
connecting(isRetry) {
executorLogger?.debug('connecting', { isRetry });
},
}),
);
opened(socket) {
executorLogger?.debug('opened', { socket });
},
connected(socket, payload) {
executorLogger?.debug('connected', { socket, payload });
},
ping(received, payload) {
executorLogger?.debug('ping', { received, payload });
},
pong(received, payload) {
executorLogger?.debug('pong', { received, payload });
},
message(message) {
executorLogger?.debug('message', { message });
},
closed(event) {
executorLogger?.debug('closed', { event });
// no subscriptions and the lazy close timeout has passed - remove the client
wsExecutorMap.delete(hash);
},
error(error) {
executorLogger?.debug('error', { error });
},
},
onClient,
});
wsExecutorMap.set(hash, wsExecutor);
}
return wsExecutor(execReq);
Expand Down
16 changes: 7 additions & 9 deletions packages/transports/ws/tests/ws.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ import type {
TransportGetSubgraphExecutorOptions,
} from '@graphql-mesh/transport-common';
import { DefaultLogger, dispose } from '@graphql-mesh/utils';
import { buildGraphQLWSExecutor } from '@graphql-tools/executor-graphql-ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { createDeferred, DisposableAsyncExecutor } from '@graphql-tools/utils';
import { createDeferred } from '@graphql-tools/utils';
import { createDisposableWebSocketServer } from '@internal/testing';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { parse } from 'graphql';
Expand All @@ -19,7 +18,7 @@ type TServerOptions = ServerOptions<{}, WSExtra | BunExtra>;

async function createTServer(
transportEntry?: Partial<TransportEntry<WSTransportOptions>>,
buildExecutor?: (client: Client) => DisposableAsyncExecutor,
onClient?: (client: Client) => void,
) {
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
Expand Down Expand Up @@ -81,7 +80,7 @@ async function createTServer(
},
logger: new DefaultLogger(),
} as unknown as TransportGetSubgraphExecutorOptions<WSTransportOptions>,
buildExecutor,
onClient,
);

return {
Expand Down Expand Up @@ -222,28 +221,27 @@ describe('WS Transport', () => {
let close!: () => void;
const { promise: waitForClosed, resolve: closed } = createDeferred<void>();

const buildGraphQLWSExecutorFn = vi.fn((client: Client) => {
const onClient = vi.fn((client: Client) => {
client.on('opened', (socket) => {
close = () => (socket as WebSocket).close();
});
client.on('closed', () => {
closed();
});
return buildGraphQLWSExecutor(client);
});

await using serv = await createTServer({}, buildGraphQLWSExecutorFn);
await using serv = await createTServer({}, onClient);

await serv.executeHelloQuery();

expect(buildGraphQLWSExecutorFn).toBeCalledTimes(1);
expect(onClient).toBeCalledTimes(1);

close();
await waitForClosed;

await serv.executeHelloQuery();

expect(buildGraphQLWSExecutorFn).toBeCalledTimes(2);
expect(onClient).toBeCalledTimes(2);
});

it.skipIf(
Expand Down
1 change: 0 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3915,7 +3915,6 @@ __metadata:
"@types/ws": "npm:^8"
graphql: "npm:^16.9.0"
graphql-ws: "npm:^6.0.3"
isomorphic-ws: "npm:^5.0.0"
pkgroll: "npm:2.6.1"
tslib: "npm:^2.8.1"
ws: "npm:^8.18.0"
Expand Down

0 comments on commit 7d42160

Please sign in to comment.