Skip to content

Commit

Permalink
enhance(runtime): prepare for the future supergraph executor (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan authored Feb 5, 2025
1 parent 1721a0a commit 203172c
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 206 deletions.
5 changes: 5 additions & 0 deletions .changeset/light-badgers-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-mesh/fusion-runtime': patch
---

Refactor to make it easier to replace the supergraph execution
37 changes: 27 additions & 10 deletions packages/fusion-runtime/src/executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createDefaultExecutor } from '@graphql-mesh/transport-common';
import {
Executor,
isAsyncIterable,
mapAsyncIterator,
mapMaybePromise,
Expand Down Expand Up @@ -30,17 +31,33 @@ export function getExecutorForUnifiedGraph<TContext>(
return mapMaybePromise(
unifiedGraphManager.getContext(execReq.context),
(context) => {
function handleExecutor(executor: Executor) {
opts?.transportContext?.logger?.debug(
'Executing request on unified graph',
print(execReq.document),
);
return executor({
...execReq,
context,
});
}
return mapMaybePromise(
unifiedGraphManager.getUnifiedGraph(),
(unifiedGraph) => {
opts?.transportContext?.logger?.debug(
'Executing request on unified graph',
print(execReq.document),
);
return createDefaultExecutor(unifiedGraph)({
...execReq,
context,
});
unifiedGraphManager.getExecutor(),
(executor) => {
if (!executor) {
return mapMaybePromise(
unifiedGraphManager.getUnifiedGraph(),
(unifiedGraph) => {
opts?.transportContext?.logger?.debug(
'Executing request on unified graph',
print(execReq.document),
);
executor = createDefaultExecutor(unifiedGraph);
return handleExecutor(executor);
},
);
}
return handleExecutor(executor);
},
);
},
Expand Down
34 changes: 14 additions & 20 deletions packages/fusion-runtime/src/federation/subgraph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { TransportEntry } from '@graphql-mesh/transport-common';
import { YamlConfig } from '@graphql-mesh/types';
import type {
MergedTypeConfig,
Expand Down Expand Up @@ -40,13 +39,16 @@ import {
typeFromAST,
visit,
} from 'graphql';
import { compareSubgraphNames, type getOnSubgraphExecute } from '../utils';
import {
compareSubgraphNames,
TransportEntry,
type getOnSubgraphExecute,
} from '../utils';

export interface HandleFederationSubschemaOpts {
subschemaConfig: SubschemaConfig & { endpoint?: string };
unifiedGraphDirectives?: Record<string, any>;
realSubgraphNameMap?: Map<string, string>;
schemaDirectives?: Record<string, any>;
transportEntryMap: Record<string, TransportEntry>;
additionalTypeDefs: TypeSource[];
stitchingDirectivesTransformer: (
subschemaConfig: SubschemaConfig,
Expand All @@ -56,9 +58,8 @@ export interface HandleFederationSubschemaOpts {

export function handleFederationSubschema({
subschemaConfig,
unifiedGraphDirectives,
realSubgraphNameMap,
schemaDirectives,
transportEntryMap,
additionalTypeDefs,
stitchingDirectivesTransformer,
onSubgraphExecute,
Expand All @@ -72,13 +73,16 @@ export function handleFederationSubschema({
transport: TransportEntry;
[key: string]: any;
}>(subschemaConfig.schema);
const directivesToLook = schemaDirectives || subgraphDirectives;

// We need to add subgraph specific directives from supergraph to the subgraph schema
// So the executor can use it
const directivesToLook = unifiedGraphDirectives || subgraphDirectives;
for (const directiveName in directivesToLook) {
if (
!subgraphDirectives[directiveName]?.length &&
schemaDirectives?.[directiveName]?.length
unifiedGraphDirectives?.[directiveName]?.length
) {
const directives = schemaDirectives[directiveName];
const directives = unifiedGraphDirectives[directiveName];
for (const directive of directives) {
if (directive.subgraph && directive.subgraph !== subgraphName) {
continue;
Expand All @@ -91,17 +95,7 @@ export function handleFederationSubschema({
const subgraphExtensions: Record<string, unknown> =
(subschemaConfig.schema.extensions ||= {});
subgraphExtensions['directives'] = subgraphDirectives;
const transportDirectives = (subgraphDirectives.transport ||= []);
const transportDirective = transportDirectives[0];
if (transportDirective) {
transportEntryMap[subgraphName] = transportDirective;
} else {
transportEntryMap[subgraphName] = {
kind: 'http',
subgraph: subgraphName,
location: subschemaConfig.endpoint,
};
}

interface TypeDirectives {
source: SourceDirective;
[key: string]: any;
Expand Down
158 changes: 120 additions & 38 deletions packages/fusion-runtime/src/federation/supergraph.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { TransportEntry } from '@graphql-mesh/transport-common';
import type { YamlConfig } from '@graphql-mesh/types';
import { resolveAdditionalResolversWithoutImport } from '@graphql-mesh/utils';
import type { SubschemaConfig } from '@graphql-tools/delegate';
import {
getInContextSDK,
requestIdByRequest,
resolveAdditionalResolversWithoutImport,
} from '@graphql-mesh/utils';
import type {
DelegationPlanBuilder,
StitchingInfo,
SubschemaConfig,
} from '@graphql-tools/delegate';
import { getStitchedSchemaFromSupergraphSdl } from '@graphql-tools/federation';
import { mergeTypeDefs } from '@graphql-tools/merge';
import { createMergedTypeResolver } from '@graphql-tools/stitch';
Expand All @@ -14,7 +21,6 @@ import {
MapperKind,
mapSchema,
memoize1,
mergeDeep,
TypeSource,
} from '@graphql-tools/utils';
import {
Expand All @@ -24,8 +30,16 @@ import {
type GraphQLSchema,
type ObjectTypeDefinitionNode,
} from 'graphql';
import type { UnifiedGraphHandler } from '../unifiedGraphManager';
import { wrapMergedTypeResolver } from '../utils';
import type {
UnifiedGraphHandler,
UnifiedGraphHandlerOpts,
UnifiedGraphHandlerResult,
} from '../unifiedGraphManager';
import {
compareSubgraphNames,
OnDelegationPlanDoneHook,
wrapMergedTypeResolver,
} from '../utils';
import { handleFederationSubschema } from './subgraph';

// Memoize to avoid re-parsing the same schema AST
Expand Down Expand Up @@ -139,22 +153,19 @@ export function handleResolveToDirectives(
export const handleFederationSupergraph: UnifiedGraphHandler = function ({
unifiedGraph,
onSubgraphExecute,
onDelegationPlanHooks,
onDelegationStageExecuteHooks,
onDelegateHooks,
additionalTypeDefs: additionalTypeDefsFromConfig = [],
additionalResolvers: additionalResolversFromConfig = [],
transportEntryAdditions,
batch = true,
logger,
}) {
}: UnifiedGraphHandlerOpts): UnifiedGraphHandlerResult {
const additionalTypeDefs = [...asArray(additionalTypeDefsFromConfig)];
const additionalResolvers = [...asArray(additionalResolversFromConfig)];
const transportEntryMap: Record<string, TransportEntry> = {};
let subschemas: SubschemaConfig[] = [];
const stitchingDirectivesTransformer =
getStitchingDirectivesTransformerForSubschema();
unifiedGraph = restoreExtraDirectives(unifiedGraph);
// Get Transport Information from Schema Directives
const schemaDirectives = getDirectiveExtensions(unifiedGraph);
// Workaround to get the real name of the subschema
const realSubgraphNameMap = new Map<string, string>();
const joinGraphType = unifiedGraph.getType('join__Graph');
Expand All @@ -173,6 +184,8 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({
}
}

const unifiedGraphDirectives = getDirectiveExtensions(unifiedGraph);

let executableUnifiedGraph = getStitchedSchemaFromSupergraphSdl({
supergraphSdl: getDocumentNodeFromSchema(unifiedGraph),
/**
Expand All @@ -185,9 +198,8 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({
onSubschemaConfig: (subschemaConfig) =>
handleFederationSubschema({
subschemaConfig,
unifiedGraphDirectives,
realSubgraphNameMap,
schemaDirectives,
transportEntryMap,
additionalTypeDefs,
stitchingDirectivesTransformer,
onSubgraphExecute,
Expand Down Expand Up @@ -264,34 +276,104 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({
});
},
});

if (transportEntryAdditions) {
const wildcardTransportOptions = transportEntryAdditions['*'];
for (const subgraphName in transportEntryMap) {
const toBeMerged: Partial<TransportEntry>[] = [];
const transportEntry = transportEntryMap[subgraphName];
if (transportEntry) {
toBeMerged.push(transportEntry);
}
const transportOptionBySubgraph = transportEntryAdditions[subgraphName];
if (transportOptionBySubgraph) {
toBeMerged.push(transportOptionBySubgraph);
}
const transportOptionByKind =
transportEntryAdditions['*.' + transportEntry?.kind];
if (transportOptionByKind) {
toBeMerged.push(transportOptionByKind);
}
if (wildcardTransportOptions) {
toBeMerged.push(wildcardTransportOptions);
const inContextSDK = getInContextSDK(
executableUnifiedGraph,
// @ts-expect-error Legacy Mesh RawSource is not compatible with new Mesh
subschemas,
logger,
onDelegateHooks || [],
);
const stitchingInfo = executableUnifiedGraph.extensions?.[
'stitchingInfo'
] as StitchingInfo;
if (stitchingInfo && onDelegationPlanHooks?.length) {
for (const typeName in stitchingInfo.mergedTypes) {
const mergedTypeInfo = stitchingInfo.mergedTypes[typeName];
if (mergedTypeInfo) {
const originalDelegationPlanBuilder =
mergedTypeInfo.nonMemoizedDelegationPlanBuilder;
mergedTypeInfo.nonMemoizedDelegationPlanBuilder = (
supergraph,
sourceSubschema,
variables,
fragments,
fieldNodes,
context,
info,
) => {
let delegationPlanBuilder = originalDelegationPlanBuilder;
function setDelegationPlanBuilder(
newDelegationPlanBuilder: DelegationPlanBuilder,
) {
delegationPlanBuilder = newDelegationPlanBuilder;
}
const onDelegationPlanDoneHooks: OnDelegationPlanDoneHook[] = [];
let currentLogger = logger;
let requestId: string | undefined;
if (context?.request) {
requestId = requestIdByRequest.get(context.request);
if (requestId) {
currentLogger = currentLogger?.child(requestId);
}
}
if (sourceSubschema.name) {
currentLogger = currentLogger?.child(sourceSubschema.name);
}
for (const onDelegationPlan of onDelegationPlanHooks) {
const onDelegationPlanDone = onDelegationPlan({
supergraph,
subgraph: sourceSubschema.name!,
sourceSubschema,
typeName: mergedTypeInfo.typeName,
variables,
fragments,
fieldNodes,
logger: currentLogger,
context,
info,
delegationPlanBuilder,
setDelegationPlanBuilder,
});
if (onDelegationPlanDone) {
onDelegationPlanDoneHooks.push(onDelegationPlanDone);
}
}
let delegationPlan = delegationPlanBuilder(
supergraph,
sourceSubschema,
variables,
fragments,
fieldNodes,
context,
info,
);
function setDelegationPlan(
newDelegationPlan: ReturnType<DelegationPlanBuilder>,
) {
delegationPlan = newDelegationPlan;
}
for (const onDelegationPlanDone of onDelegationPlanDoneHooks) {
onDelegationPlanDone({
delegationPlan,
setDelegationPlan,
});
}
return delegationPlan;
};
}
transportEntryMap[subgraphName] = mergeDeep(toBeMerged);
}
}
return {
unifiedGraph: executableUnifiedGraph,
subschemas,
transportEntryMap,
additionalResolvers,
inContextSDK,
getSubgraphSchema(subgraphName) {
const subgraph = subschemas.find(
(s) => s.name && compareSubgraphNames(s.name, subgraphName),
);
if (!subgraph) {
throw new Error(`Subgraph ${subgraphName} not found`);
}
return subgraph.schema;
},
};
};
Loading

0 comments on commit 203172c

Please sign in to comment.