From 7e3be481d8c9dcd81386079fe595b4fee0541e1b Mon Sep 17 00:00:00 2001 From: Darren Date: Fri, 13 Dec 2024 10:00:36 +0000 Subject: [PATCH 1/4] wip --- .../plugins/opentelemetry/src/attributes.ts | 26 - packages/plugins/opentelemetry/src/index.ts | 1 - packages/plugins/opentelemetry/src/plugin.ts | 589 +++++++----------- .../plugins/opentelemetry/src/processors.ts | 122 ---- packages/plugins/opentelemetry/src/spans.ts | 288 --------- packages/plugins/opentelemetry/src/utils.ts | 32 + 6 files changed, 256 insertions(+), 802 deletions(-) delete mode 100644 packages/plugins/opentelemetry/src/attributes.ts delete mode 100644 packages/plugins/opentelemetry/src/spans.ts create mode 100644 packages/plugins/opentelemetry/src/utils.ts diff --git a/packages/plugins/opentelemetry/src/attributes.ts b/packages/plugins/opentelemetry/src/attributes.ts deleted file mode 100644 index 09f23fa32..000000000 --- a/packages/plugins/opentelemetry/src/attributes.ts +++ /dev/null @@ -1,26 +0,0 @@ -// HTTP/network attributes -export { - SEMATTRS_HTTP_CLIENT_IP, - SEMATTRS_HTTP_HOST, - SEMATTRS_HTTP_METHOD, - SEMATTRS_HTTP_ROUTE, - SEMATTRS_HTTP_SCHEME, - SEMATTRS_HTTP_SERVER_NAME, - SEMATTRS_HTTP_STATUS_CODE, - SEMATTRS_HTTP_URL, - SEMATTRS_HTTP_USER_AGENT, - SEMATTRS_NET_HOST_NAME, - ATTR_SERVICE_NAME as SEMRESATTRS_SERVICE_NAME, - ATTR_SERVICE_VERSION, -} from '@opentelemetry/semantic-conventions'; - -// GraphQL-specific attributes -// Based on https://opentelemetry.io/docs/specs/semconv/attributes-registry/graphql/ -export const SEMATTRS_GRAPHQL_DOCUMENT = 'graphql.document'; -export const SEMATTRS_GRAPHQL_OPERATION_TYPE = 'graphql.operation.type'; -export const SEMATTRS_GRAPHQL_OPERATION_NAME = 'graphql.operation.name'; -export const SEMATTRS_GRAPHQL_ERROR_COUNT = 'graphql.error.count'; - -// Gateway-specific attributes -export const SEMATTRS_GATEWAY_UPSTREAM_SUBGRAPH_NAME = - 'gateway.upstream.subgraph.name'; diff --git a/packages/plugins/opentelemetry/src/index.ts b/packages/plugins/opentelemetry/src/index.ts index 04669571b..b7e90b355 100644 --- a/packages/plugins/opentelemetry/src/index.ts +++ b/packages/plugins/opentelemetry/src/index.ts @@ -1,4 +1,3 @@ -export * from './processors'; export { useOpenTelemetry, type OpenTelemetryGatewayPluginOptions as OpenTelemetryMeshPluginOptions, diff --git a/packages/plugins/opentelemetry/src/plugin.ts b/packages/plugins/opentelemetry/src/plugin.ts index cce3fe667..d3b62524e 100644 --- a/packages/plugins/opentelemetry/src/plugin.ts +++ b/packages/plugins/opentelemetry/src/plugin.ts @@ -1,416 +1,275 @@ -import { - type OnExecuteEventPayload, - type OnParseEventPayload, - type OnValidateEventPayload, -} from '@envelop/types'; -import { type GatewayPlugin } from '@graphql-hive/gateway-runtime'; -import type { OnSubgraphExecutePayload } from '@graphql-mesh/fusion-runtime'; -import type { Logger, OnFetchHookPayload } from '@graphql-mesh/types'; -import { getHeadersObj } from '@graphql-mesh/utils'; -import { - fakePromise, - isAsyncIterable, - MaybePromise, -} from '@graphql-tools/utils'; -import { - context, - diag, - DiagLogLevel, - propagation, - trace, - type Context, - type TextMapGetter, - type Tracer, -} from '@opentelemetry/api'; -import { Resource } from '@opentelemetry/resources'; -import { type SpanProcessor } from '@opentelemetry/sdk-trace-base'; -import { WebTracerProvider } from '@opentelemetry/sdk-trace-web'; -import { DisposableSymbols } from '@whatwg-node/disposablestack'; -import { type OnRequestEventPayload } from '@whatwg-node/server'; -import { ATTR_SERVICE_VERSION, SEMRESATTRS_SERVICE_NAME } from './attributes'; -import { - completeHttpSpan, - createGraphQLExecuteSpan, - createGraphQLParseSpan, - createGraphQLValidateSpan, - createHttpSpan, - createSubgraphExecuteFetchSpan, - createUpstreamHttpFetchSpan, -} from './spans'; - -type PrimitiveOrEvaluated = - | TExpectedResult - | ((input: TInput) => TExpectedResult); - -interface OpenTelemetryGatewayPluginOptionsWithoutInit { - /** - * Whether to initialize the OpenTelemetry SDK (default: true). - */ - initializeNodeSDK: false; -} +import { type GatewayPlugin } from '@graphql-hive/gateway'; +import type { Logger } from '@graphql-mesh/types'; +import * as api from '@opentelemetry/api'; +import { SpanStatusCode} from "@opentelemetry/api"; -interface OpenTelemetryGatewayPluginOptionsWithInit { - /** - * Whether to initialize the OpenTelemetry SDK (default: true). - */ - initializeNodeSDK?: true; +import {isAsyncIterable, YogaInitialContext} from "graphql-yoga"; +import {sanitiseDocument} from "./utils"; +import { print } from "graphql"; +import {ExecutionResult} from "@graphql-tools/utils"; + +export type OpenTelemetryGatewayPluginOptions = { /** - * A list of OpenTelemetry exporters to use for exporting the spans. - * You can use exporters from `@opentelemetry/exporter-*` packages, or use the built-in utility functions. - * - * Does not apply when `initializeNodeSDK` is `false`. + * Tracer instance to use for creating spans (default: a tracer with name 'gateway'). */ - exporters: MaybePromise[]; + tracer?: api.Tracer; + /** - * Service name to use for OpenTelemetry NodeSDK resource option (default: 'Gateway'). + * Options to control which spans to create. + * By default, all spans are enabled. * - * Does not apply when `initializeNodeSDK` is `false`. + * You may specify a boolean value to enable/disable all spans, or a function to dynamically enable/disable spans based on the input. */ - serviceName?: string; -} - -type OpenTelemetryGatewayPluginOptionsInit = - | OpenTelemetryGatewayPluginOptionsWithInit - | OpenTelemetryGatewayPluginOptionsWithoutInit; - -export type OpenTelemetryGatewayPluginOptions = - OpenTelemetryGatewayPluginOptionsInit & { + spans?: { /** - * Tracer instance to use for creating spans (default: a tracer with name 'gateway'). + * Enable/disable GraphQL parse spans (default: true). */ - tracer?: Tracer; + parse: boolean | undefined; /** - * Whether to inherit the context from the calling service (default: true). - * - * This process is done by extracting the context from the incoming request headers. If disabled, a new context and a trace-id will be created. - * - * See https://opentelemetry.io/docs/languages/js/propagation/ + * Enable/disable GraphQL validate spans (default: true). */ - inheritContext?: boolean; + validate?: boolean | undefined; /** - * Whether to propagate the context to the outgoing requests (default: true). - * - * This process is done by injecting the context into the outgoing request headers. If disabled, the context will not be propagated. - * - * See https://opentelemetry.io/docs/languages/js/propagation/ + * Enable/disable GraphQL execute spans (default: true). */ - propagateContext?: boolean; + execute?: boolean | undefined; /** - * Options to control which spans to create. - * By default, all spans are enabled. - * - * You may specify a boolean value to enable/disable all spans, or a function to dynamically enable/disable spans based on the input. + * Enable/disable GraphQL subscribe spans (default: true). */ - spans?: { - /** - * Enable/disable HTTP request spans (default: true). - * - * Disabling the HTTP span will also disable all other child spans. - */ - http?: PrimitiveOrEvaluated>; - /** - * Enable/disable GraphQL parse spans (default: true). - */ - graphqlParse?: PrimitiveOrEvaluated>; - /** - * Enable/disable GraphQL validate spans (default: true). - */ - graphqlValidate?: PrimitiveOrEvaluated< - boolean, - OnValidateEventPayload - >; - /** - * Enable/disable GraphQL execute spans (default: true). - */ - graphqlExecute?: PrimitiveOrEvaluated< - boolean, - OnExecuteEventPayload - >; - /** - * Enable/disable subgraph execute spans (default: true). - */ - subgraphExecute?: PrimitiveOrEvaluated< - boolean, - OnSubgraphExecutePayload - >; - /** - * Enable/disable upstream HTTP fetch calls spans (default: true). - */ - upstreamFetch?: PrimitiveOrEvaluated>; - }; + subscribe: boolean | undefined; + /** + * Enable/disable subgraph execute spans (default: true). + */ + subgraphExecute?: boolean | undefined; }; - -const HeadersTextMapGetter: TextMapGetter = { - keys(carrier) { - return [...carrier.keys()]; - }, - get(carrier, key) { - return carrier.get(key) || undefined; - }, }; - -export function useOpenTelemetry( - options: OpenTelemetryGatewayPluginOptions & { logger: Logger }, -): GatewayPlugin<{ - opentelemetry: { - tracer: Tracer; - activeContext: () => Context; +// +export interface OtelContext{ + otel: { + context: { + active: api.Context + }; }; -}> { - const inheritContext = options.inheritContext ?? true; - const propagateContext = options.propagateContext ?? true; - - const requestContextMapping = new WeakMap(); - let tracer: Tracer; +} - let spanProcessors: SpanProcessor[]; - let serviceName: string = 'Gateway'; - let provider: WebTracerProvider; +export function useOpenTelemetry( + options: OpenTelemetryGatewayPluginOptions & { logger: Logger }, +): GatewayPlugin { + const tracer = options.tracer || api.trace.getTracer('graphql-gateway'); - let preparation$: Promise | undefined; + options.spans ||= { validate: true, parse: true, execute: true, subscribe: true, subgraphExecute: true} return { - onYogaInit({ yoga }) { - preparation$ = fakePromise(undefined).then(async () => { - if ( - !( - 'initializeNodeSDK' in options && - options.initializeNodeSDK === false - ) - ) { - if (options.serviceName) { - serviceName = options.serviceName; - } - if (options.exporters) { - spanProcessors = await Promise.all(options.exporters); + // TODO: on request / on response graphql.request + onParse: ({ context, extendContext, parseFn, setParseFn }) => { + const span = tracer.startSpan("graphql.parse", { + attributes: { + [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", + } + }, api.context.active()) + + extendContext({ + otel: { + context: { + active: api.trace.setSpan(api.context.active(), span), } - const webProvider = new WebTracerProvider({ - resource: new Resource({ - [SEMRESATTRS_SERVICE_NAME]: serviceName, - [ATTR_SERVICE_VERSION]: yoga.version, - }), - spanProcessors, - }); - webProvider.register(); - provider = webProvider; } - const pluginLogger = options.logger.child('OpenTelemetry'); - diag.setLogger( - { - error: (message, ...args) => pluginLogger.error(message, ...args), - warn: (message, ...args) => pluginLogger.warn(message, ...args), - info: (message, ...args) => pluginLogger.info(message, ...args), - debug: (message, ...args) => pluginLogger.debug(message, ...args), - verbose: (message, ...args) => pluginLogger.debug(message, ...args), - }, - DiagLogLevel.VERBOSE, - ); - tracer = options.tracer || trace.getTracer('gateway'); - preparation$ = undefined; }); + + setParseFn((args) =>{ + return api.context.with(api.trace.setSpan(getActiveContext(context), span), () => parseFn(args)) + }) + + return ({result}) => { + span.setAttribute(ATTR_GRAPHQL_DOCUMENT, print(sanitiseDocument(result))) + + if (result instanceof Error) { + span.setAttribute(ATTR_GRAPHQL_ERROR_COUNT, 1); + span.recordException(result); + span.setStatus({ code: SpanStatusCode.ERROR }); // TODO: should we have message ? Leaking? + } + + span.end(); + } }, - onContextBuilding({ extendContext, context }) { + onValidate({extendContext, context, setValidationFn, validateFn, params}) { + const span = tracer.startSpan("graphql.validate", { + attributes: { + [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", + [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(params.documentAST)) + } + }, api.context.active()) + + + extendContext({ - opentelemetry: { - tracer, - activeContext: () => - requestContextMapping.get(context.request) ?? context['active'](), - }, + otel: { + context: { + active: api.trace.setSpan(api.context.active(), span), + } + } }); - }, - onRequest(onRequestPayload) { - const shouldTraceHttp = - typeof options.spans?.http === 'function' - ? options.spans.http(onRequestPayload) - : (options.spans?.http ?? true); - - if (!shouldTraceHttp) { - return preparation$; - } - const { request, url } = onRequestPayload; - const otelContext = inheritContext - ? propagation.extract( - context.active(), - request.headers, - HeadersTextMapGetter, - ) - : context.active(); - - const httpSpan = createHttpSpan({ - request, - url, - tracer, - otelContext, - }); + setValidationFn((schema, documentAST, rules, options, typeInfo) =>{ + return api.context.with(api.trace.setSpan(getActiveContext(context), span), () => validateFn(schema, documentAST, rules, options, typeInfo)); + }) - requestContextMapping.set(request, trace.setSpan(otelContext, httpSpan)); + return (result) => { + if (result instanceof Error) { + span.setStatus({ code: SpanStatusCode.ERROR }); + } - return preparation$; - }, - onValidate(onValidatePayload) { - const shouldTraceValidate = - typeof options.spans?.graphqlValidate === 'function' - ? options.spans.graphqlValidate(onValidatePayload) - : (options.spans?.graphqlValidate ?? true); - - const { context } = onValidatePayload; - const otelContext = requestContextMapping.get(context.request); - - if (shouldTraceValidate && otelContext) { - const { done } = createGraphQLValidateSpan({ - otelContext, - tracer, - query: context.params.query, - operationName: context.params.operationName, - }); - - return ({ result }) => done(result); + if (Array.isArray(result) && result.length > 0) { + span.setAttribute(ATTR_GRAPHQL_ERROR_COUNT, result.length); + span.setStatus({code: SpanStatusCode.ERROR}); + for (const error in result) { + span.recordException(error); + } + } + span.end(); } - return void 0; }, - onParse(onParsePayload) { - const shouldTracePrase = - typeof options.spans?.graphqlParse === 'function' - ? options.spans.graphqlParse(onParsePayload) - : (options.spans?.graphqlParse ?? true); - - const { context } = onParsePayload; - const otelContext = requestContextMapping.get(context.request); - - if (shouldTracePrase && otelContext) { - const { done } = createGraphQLParseSpan({ - otelContext, - tracer, - query: context.params.query, - operationName: context.params.operationName, - }); - - return ({ result }) => done(result); + onContextBuilding({context, extendContext}) { + const span = tracer.startSpan("graphql.context-building", { + attributes: { + [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", + // [ATTR_GRAPHQL_DOCUMENT]: print(sanatiseDocument(context.params.documentAST)) TODO + } + }, api.context.active()) + extendContext({ + otel: { + context: { + active: api.trace.setSpan(api.context.active(), span), + } + } + }); + + return () => { + span.end() } - return void 0; }, - onExecute(onExecuteArgs) { - const shouldTraceExecute = - typeof options.spans?.graphqlExecute === 'function' - ? options.spans.graphqlExecute(onExecuteArgs) - : (options.spans?.graphqlExecute ?? true); - - const { args } = onExecuteArgs; - const otelContext = requestContextMapping.get(args.contextValue.request); - - if (shouldTraceExecute && otelContext) { - const { done } = createGraphQLExecuteSpan({ - args, - otelContext, - tracer, - }); + onExecute: ({ args: {contextValue: context}, extendContext, setExecuteFn, executeFn }) => { + const span = tracer.startSpan("graphql.execute", { + attributes: { + [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", + // [ATTR_GRAPHQL_DOCUMENT]: print(sanatiseDocument(context.params.documentAST)) TODO + } + }, api.context.active()) - return { - onExecuteDone: ({ result }) => { - if (!isAsyncIterable(result)) { - done(result); + setExecuteFn((args) =>{ + return api.context.with(api.trace.setSpan(getActiveContext(context), span), () => executeFn(args)) + }) + + extendContext({ + otel: { + context: { + active: api.trace.setSpan(api.context.active(), span), + } + } + }); + + return { + onExecuteDone: ({result, setResult, args}) => { + + if (!isAsyncIterable(result)){ + setResult(addTraceId(args?.contextValue?.otel?.context.active || api.context.active(), result)) + + span.end(); + if ( result?.errors && result.errors.length > 0){ + span.setStatus({code: SpanStatusCode.ERROR}); + span.setAttribute(ATTR_GRAPHQL_ERROR_COUNT, result.errors.length); } - }, - }; - } - return void 0; - }, - onSubgraphExecute(onSubgraphPayload) { - const shouldTraceSubgraphExecute = - typeof options.spans?.subgraphExecute === 'function' - ? options.spans.subgraphExecute(onSubgraphPayload) - : (options.spans?.subgraphExecute ?? true); - - const otelContext = onSubgraphPayload.executionRequest.context?.request - ? requestContextMapping.get( - onSubgraphPayload.executionRequest.context.request, - ) - : undefined; - - if (shouldTraceSubgraphExecute && otelContext) { - const { subgraphName, executionRequest } = onSubgraphPayload; - const { done } = createSubgraphExecuteFetchSpan({ - otelContext, - tracer, - executionRequest, - subgraphName, - }); - - return done; + return; + } + + return { + onNext: ({ result }) => { + if (result?.errors && result.errors.length > 0) { + span.setAttribute(ATTR_GRAPHQL_ERROR_COUNT, result.errors.length); + span.setStatus({code: SpanStatusCode.ERROR}); + } + }, + onEnd: () => { + span.end(); + }, + }; + + }, } - return void 0; }, - onFetch(onFetchPayload) { - const shouldTraceFetch = - typeof options.spans?.upstreamFetch === 'function' - ? options.spans.upstreamFetch(onFetchPayload) - : (options.spans?.upstreamFetch ?? true); - - const { - context, - options: fetchOptions, - url, - setOptions, - executionRequest, - } = onFetchPayload; - - const otelContext = requestContextMapping.get(context.request); - if (shouldTraceFetch && otelContext) { - if (propagateContext) { - const reqHeaders = getHeadersObj(fetchOptions.headers || {}); - propagation.inject(otelContext, reqHeaders); - - setOptions({ - ...fetchOptions, - headers: reqHeaders, - }); + onSubscribe: ({ args: {contextValue: context}, extendContext, subscribeFn, setSubscribeFn }) => { + const span = tracer.startSpan("graphql.subscribe", { + attributes: { + [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", + // [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(context.params.query)) TODO } + }, api.context.active()) - const { done } = createUpstreamHttpFetchSpan({ - otelContext, - tracer, - url, - fetchOptions, - executionRequest, - }); + setSubscribeFn((args) =>{ + return api.context.with(api.trace.setSpan(getActiveContext(args.contextValue), span), () => subscribeFn(args)) + }) - return (fetchDonePayload) => done(fetchDonePayload.response); + extendContext({ + otel: { + context: { + active: api.trace.setSpan(api.context.active(), span), + } + } + }); + + return { + onSubscribeError: ({error}) => { + if (error) span.setStatus({code: SpanStatusCode.ERROR}); + }, + onSubscribeResult: () => { + return { + onNext({result}) { + if (result?.errors && result.errors.length > 0) span.setStatus({code: SpanStatusCode.ERROR}); + }, + onEnd() { + span.end(); + }, + }; + }, } - return void 0; }, - onResponse({ request, response }) { - const otelContext = requestContextMapping.get(request); - if (!otelContext) { - return; - } - - const rootSpan = trace.getSpan(otelContext); + onSubgraphExecute: ({executionRequest}) => { + const span = tracer.startSpan("graphql.subgraph.execute", { + attributes: { + [ATTR_OPERATION_NAME]: executionRequest.operationName, + [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(executionRequest.document)) + } + }, api.context.active()); - if (rootSpan) { - completeHttpSpan(rootSpan, response); - } - requestContextMapping.delete(request); - }, - async [DisposableSymbols.asyncDispose]() { - if (spanProcessors) { - await Promise.all( - spanProcessors.map((processor) => processor.forceFlush()), - ); + return ({ result }) => { + if (isAsyncIterable(result)) { + return { + onEnd: () => { + span.end(); + } + } + } + span.end(); + return { + } } - await provider?.forceFlush?.(); + } + } +} - if (spanProcessors) { - spanProcessors.forEach((processor) => processor.shutdown()); - } +const ATTR_GRAPHQL_ERROR_COUNT = "graphql.errors.count" +const ATTR_GRAPHQL_DOCUMENT = "graphql.document" +const ATTR_OPERATION_NAME = "graphql.operation" - await provider?.shutdown?.(); +const getActiveContext = (context: Partial & YogaInitialContext | undefined) => context?.otel?.context.active || api.context.active(); - diag.disable(); - trace.disable(); - context.disable(); - propagation.disable(); +const addTraceId = (context: api.Context, result: ExecutionResult): ExecutionResult => { + return { + ...result, + extensions: { + ...result.extensions, + trace_id: api.trace.getSpan(context)?.spanContext().traceId, }, }; -} +}; diff --git a/packages/plugins/opentelemetry/src/processors.ts b/packages/plugins/opentelemetry/src/processors.ts index 3a87629f1..e69de29bb 100644 --- a/packages/plugins/opentelemetry/src/processors.ts +++ b/packages/plugins/opentelemetry/src/processors.ts @@ -1,122 +0,0 @@ -import type { AzureMonitorExporterOptions } from '@azure/monitor-opentelemetry-exporter'; -import { mapMaybePromise, MaybePromise } from '@graphql-tools/utils'; -import { OTLPTraceExporter as OtlpHttpExporter } from '@opentelemetry/exporter-trace-otlp-http'; -import { - ZipkinExporter, - type ExporterConfig as ZipkinExporterConfig, -} from '@opentelemetry/exporter-zipkin'; -import { type OTLPExporterNodeConfigBase } from '@opentelemetry/otlp-exporter-base'; -import { type OTLPGRPCExporterConfigNode } from '@opentelemetry/otlp-grpc-exporter-base'; -import { - BatchSpanProcessor, - ConsoleSpanExporter, - SimpleSpanProcessor, - type BufferConfig, - type SpanExporter, - type SpanProcessor, -} from '@opentelemetry/sdk-trace-base'; - -export type BatchingConfig = boolean | BufferConfig; - -function resolveBatchingConfig( - exporter: SpanExporter, - batchingConfig?: BatchingConfig, -): SpanProcessor { - const value = batchingConfig ?? true; - - if (value === true) { - return new BatchSpanProcessor(exporter); - } else if (value === false) { - return new SimpleSpanProcessor(exporter); - } else { - return new BatchSpanProcessor(exporter, value); - } -} - -export function createStdoutExporter( - batchingConfig?: BatchingConfig, -): SpanProcessor { - return resolveBatchingConfig(new ConsoleSpanExporter(), batchingConfig); -} - -export function createZipkinExporter( - config: ZipkinExporterConfig, - batchingConfig?: BatchingConfig, -): SpanProcessor { - return resolveBatchingConfig(new ZipkinExporter(config), batchingConfig); -} - -export function createOtlpHttpExporter( - config: OTLPExporterNodeConfigBase, - batchingConfig?: BatchingConfig, -): SpanProcessor { - return resolveBatchingConfig(new OtlpHttpExporter(config), batchingConfig); -} - -interface SpanExporterCtor { - new (config: TConfig): SpanExporter; -} - -function loadExporterLazily< - TConfig, - TSpanExporterCtor extends SpanExporterCtor, ->( - exporterName: string, - exporterModuleName: string, - exportNameInModule: string, -): MaybePromise { - try { - return mapMaybePromise(import(exporterModuleName), (mod) => { - const ExportCtor = - mod?.default?.[exportNameInModule] || mod?.[exportNameInModule]; - if (!ExportCtor) { - throw new Error( - `${exporterName} exporter is not available in the current environment`, - ); - } - return ExportCtor; - }); - } catch (err) { - throw new Error( - `${exporterName} exporter is not available in the current environment`, - ); - } -} - -export function createOtlpGrpcExporter( - config: OTLPGRPCExporterConfigNode, - batchingConfig?: BatchingConfig, -): MaybePromise { - return mapMaybePromise( - loadExporterLazily( - 'OTLP gRPC', - '@opentelemetry/exporter-trace-otlp-grpc', - 'OTLPTraceExporter', - ), - (OTLPTraceExporter) => { - return resolveBatchingConfig( - new OTLPTraceExporter(config), - batchingConfig, - ); - }, - ); -} - -export function createAzureMonitorExporter( - config: AzureMonitorExporterOptions, - batchingConfig?: BatchingConfig, -): MaybePromise { - return mapMaybePromise( - loadExporterLazily( - 'Azure Monitor', - '@azure/monitor-opentelemetry-exporter', - 'AzureMonitorTraceExporter', - ), - (AzureMonitorTraceExporter) => { - return resolveBatchingConfig( - new AzureMonitorTraceExporter(config), - batchingConfig, - ); - }, - ); -} diff --git a/packages/plugins/opentelemetry/src/spans.ts b/packages/plugins/opentelemetry/src/spans.ts deleted file mode 100644 index 11fc2afad..000000000 --- a/packages/plugins/opentelemetry/src/spans.ts +++ /dev/null @@ -1,288 +0,0 @@ -import { defaultPrintFn } from '@graphql-mesh/transport-common'; -import { - getOperationASTFromDocument, - type ExecutionRequest, - type ExecutionResult, -} from '@graphql-tools/utils'; -import { - SpanKind, - SpanStatusCode, - type Context, - type Span, - type Tracer, -} from '@opentelemetry/api'; -import type { ExecutionArgs } from 'graphql'; -import { - SEMATTRS_GATEWAY_UPSTREAM_SUBGRAPH_NAME, - SEMATTRS_GRAPHQL_DOCUMENT, - SEMATTRS_GRAPHQL_ERROR_COUNT, - SEMATTRS_GRAPHQL_OPERATION_NAME, - SEMATTRS_GRAPHQL_OPERATION_TYPE, - SEMATTRS_HTTP_CLIENT_IP, - SEMATTRS_HTTP_HOST, - SEMATTRS_HTTP_METHOD, - SEMATTRS_HTTP_ROUTE, - SEMATTRS_HTTP_SCHEME, - SEMATTRS_HTTP_STATUS_CODE, - SEMATTRS_HTTP_URL, - SEMATTRS_HTTP_USER_AGENT, - SEMATTRS_NET_HOST_NAME, -} from './attributes'; - -export function createHttpSpan(input: { - tracer: Tracer; - request: Request; - url: URL; - otelContext: Context; -}): Span { - const { url, request, tracer, otelContext } = input; - const path = url.pathname; - const userAgent = request.headers.get('user-agent'); - const ips = request.headers.get('x-forwarded-for'); - const method = request.method || 'GET'; - const host = url.host || request.headers.get('host'); - const hostname = url.hostname || host || 'localhost'; - const rootSpanName = `${method} ${path}`; - - return tracer.startSpan( - rootSpanName, - { - attributes: { - [SEMATTRS_HTTP_METHOD]: method, - [SEMATTRS_HTTP_URL]: request.url, - [SEMATTRS_HTTP_ROUTE]: path, - [SEMATTRS_HTTP_SCHEME]: url.protocol, - [SEMATTRS_NET_HOST_NAME]: hostname, - [SEMATTRS_HTTP_HOST]: host || undefined, - [SEMATTRS_HTTP_CLIENT_IP]: ips?.split(',')[0], - [SEMATTRS_HTTP_USER_AGENT]: userAgent || undefined, - }, - kind: SpanKind.SERVER, - }, - otelContext, - ); -} - -export function completeHttpSpan(span: Span, response: Response) { - span.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); - span.setStatus({ - code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR, - message: response.ok ? undefined : response.statusText, - }); - span.end(); -} - -export function createGraphQLParseSpan(input: { - otelContext: Context; - tracer: Tracer; - query?: string; - operationName?: string; -}) { - const parseSpan = input.tracer.startSpan( - 'graphql.parse', - { - attributes: { - [SEMATTRS_GRAPHQL_DOCUMENT]: input.query, - [SEMATTRS_GRAPHQL_OPERATION_NAME]: input.operationName, - }, - kind: SpanKind.INTERNAL, - }, - input.otelContext, - ); - - return { - parseSpan, - done: (result: any | Error | null) => { - if (result instanceof Error) { - parseSpan.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, 1); - parseSpan.recordException(result); - parseSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: result.message, - }); - } - - parseSpan.end(); - }, - }; -} - -export function createGraphQLValidateSpan(input: { - otelContext: Context; - tracer: Tracer; - query?: string; - operationName?: string; -}) { - const validateSpan = input.tracer.startSpan( - 'graphql.validate', - { - attributes: { - [SEMATTRS_GRAPHQL_DOCUMENT]: input.query, - [SEMATTRS_GRAPHQL_OPERATION_NAME]: input.operationName, - }, - kind: SpanKind.INTERNAL, - }, - input.otelContext, - ); - - return { - validateSpan, - done: (result: any[] | readonly Error[]) => { - if (result instanceof Error) { - validateSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: result.message, - }); - } else if (Array.isArray(result) && result.length > 0) { - validateSpan.setAttribute(SEMATTRS_GRAPHQL_ERROR_COUNT, result.length); - validateSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: result.map((e) => e.message).join(', '), - }); - - for (const error in result) { - validateSpan.recordException(error); - } - } - - validateSpan.end(); - }, - }; -} - -export function createGraphQLExecuteSpan(input: { - args: ExecutionArgs; - otelContext: Context; - tracer: Tracer; -}) { - const operation = getOperationASTFromDocument( - input.args.document, - input.args.operationName || undefined, - ); - const executeSpan = input.tracer.startSpan( - 'graphql.execute', - { - attributes: { - [SEMATTRS_GRAPHQL_OPERATION_TYPE]: operation.operation, - [SEMATTRS_GRAPHQL_OPERATION_NAME]: - input.args.operationName || undefined, - [SEMATTRS_GRAPHQL_DOCUMENT]: defaultPrintFn(input.args.document), - }, - kind: SpanKind.INTERNAL, - }, - input.otelContext, - ); - - return { - executeSpan, - done: (result: ExecutionResult) => { - if (result.errors && result.errors.length > 0) { - executeSpan.setAttribute( - SEMATTRS_GRAPHQL_ERROR_COUNT, - result.errors.length, - ); - executeSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: result.errors.map((e) => e.message).join(', '), - }); - - for (const error in result.errors) { - executeSpan.recordException(error); - } - } - - executeSpan.end(); - }, - }; -} - -export const subgraphExecReqSpanMap = new WeakMap(); - -export function createSubgraphExecuteFetchSpan(input: { - otelContext: Context; - tracer: Tracer; - executionRequest: ExecutionRequest; - subgraphName: string; -}) { - const subgraphExecuteSpan = input.tracer.startSpan( - `subgraph.execute (${input.subgraphName})`, - { - attributes: { - [SEMATTRS_GRAPHQL_OPERATION_NAME]: input.executionRequest.operationName, - [SEMATTRS_GRAPHQL_DOCUMENT]: defaultPrintFn( - input.executionRequest.document, - ), - [SEMATTRS_GRAPHQL_OPERATION_TYPE]: getOperationASTFromDocument( - input.executionRequest.document, - input.executionRequest.operationName, - )?.operation, - [SEMATTRS_GATEWAY_UPSTREAM_SUBGRAPH_NAME]: input.subgraphName, - }, - kind: SpanKind.CLIENT, - }, - input.otelContext, - ); - - subgraphExecReqSpanMap.set(input.executionRequest, subgraphExecuteSpan); - - return { - done() { - subgraphExecuteSpan.end(); - }, - }; -} - -export function createUpstreamHttpFetchSpan(input: { - otelContext: Context; - tracer: Tracer; - url: string; - fetchOptions: RequestInit; - executionRequest?: ExecutionRequest; -}) { - const urlObj = new URL(input.url); - - const attributes = { - [SEMATTRS_HTTP_METHOD]: input.fetchOptions.method, - [SEMATTRS_HTTP_URL]: input.url, - [SEMATTRS_NET_HOST_NAME]: urlObj.hostname, - [SEMATTRS_HTTP_HOST]: urlObj.host, - [SEMATTRS_HTTP_ROUTE]: urlObj.pathname, - [SEMATTRS_HTTP_SCHEME]: urlObj.protocol, - }; - - let fetchSpan: Span | undefined; - let isOrigSpan: boolean; - - if (input.executionRequest) { - fetchSpan = subgraphExecReqSpanMap.get(input.executionRequest); - if (fetchSpan) { - isOrigSpan = false; - fetchSpan.setAttributes(attributes); - } - } - - if (!fetchSpan) { - fetchSpan = input.tracer.startSpan( - 'http.fetch', - { - attributes, - kind: SpanKind.CLIENT, - }, - input.otelContext, - ); - isOrigSpan = true; - } - - return { - done: (response: Response) => { - fetchSpan.setAttribute(SEMATTRS_HTTP_STATUS_CODE, response.status); - fetchSpan.setStatus({ - code: response.ok ? SpanStatusCode.OK : SpanStatusCode.ERROR, - message: response.ok ? undefined : response.statusText, - }); - if (isOrigSpan) { - fetchSpan.end(); - } - }, - }; -} diff --git a/packages/plugins/opentelemetry/src/utils.ts b/packages/plugins/opentelemetry/src/utils.ts new file mode 100644 index 000000000..a67e39489 --- /dev/null +++ b/packages/plugins/opentelemetry/src/utils.ts @@ -0,0 +1,32 @@ +import {ASTNode, DocumentNode, Kind, visit} from "graphql/index"; + + +export const sanitiseDocument = (doc: DocumentNode): DocumentNode => { + const leave = (node: ASTNode): ASTNode => { + return { + ...node, + kind: Kind.VARIABLE, + name: { + kind: Kind.NAME, + value: "redacted" + }, + } + }; + return visit(doc, { + StringValue: { + leave, + }, + BooleanValue: { + leave, + }, + FloatValue: { + leave, + }, + EnumValue: { + leave, + }, + IntValue: { + leave, + }, + }); +}; From b2191839434921a01ddd3323fb4921356c9cf469 Mon Sep 17 00:00:00 2001 From: Darren Date: Fri, 13 Dec 2024 12:04:15 +0000 Subject: [PATCH 2/4] basic attributes --- packages/plugins/opentelemetry/package.json | 1 + .../plugins/opentelemetry/src/attributes.ts | 4 + packages/plugins/opentelemetry/src/plugin.ts | 96 +++----- packages/plugins/opentelemetry/src/utils.ts | 13 + .../tests/useOpenTelemetry.spec.ts | 223 +++++++++--------- yarn.lock | 10 + 6 files changed, 175 insertions(+), 172 deletions(-) create mode 100644 packages/plugins/opentelemetry/src/attributes.ts diff --git a/packages/plugins/opentelemetry/package.json b/packages/plugins/opentelemetry/package.json index 92d42e4f7..42fda0291 100644 --- a/packages/plugins/opentelemetry/package.json +++ b/packages/plugins/opentelemetry/package.json @@ -50,6 +50,7 @@ "@graphql-mesh/utils": "^0.103.6", "@graphql-tools/utils": "^10.6.2", "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.29.0", "@opentelemetry/exporter-trace-otlp-grpc": "^0.56.0", "@opentelemetry/exporter-trace-otlp-http": "^0.56.0", "@opentelemetry/exporter-zipkin": "^1.29.0", diff --git a/packages/plugins/opentelemetry/src/attributes.ts b/packages/plugins/opentelemetry/src/attributes.ts new file mode 100644 index 000000000..de6aac4aa --- /dev/null +++ b/packages/plugins/opentelemetry/src/attributes.ts @@ -0,0 +1,4 @@ +export const ATTR_GRAPHQL_ERROR_COUNT = "graphql.errors.count" +export const ATTR_GRAPHQL_DOCUMENT = "graphql.document" +export const ATTR_GRAPHQL_OPERATION_NAME = "graphql.operation.name" +export const ATTR_GRAPHQL_OPERATION_TYPE = "graphql.operation.type" \ No newline at end of file diff --git a/packages/plugins/opentelemetry/src/plugin.ts b/packages/plugins/opentelemetry/src/plugin.ts index d3b62524e..777d8099b 100644 --- a/packages/plugins/opentelemetry/src/plugin.ts +++ b/packages/plugins/opentelemetry/src/plugin.ts @@ -1,12 +1,11 @@ import { type GatewayPlugin } from '@graphql-hive/gateway'; -import type { Logger } from '@graphql-mesh/types'; import * as api from '@opentelemetry/api'; -import { SpanStatusCode} from "@opentelemetry/api"; +import {Attributes, SpanStatusCode} from "@opentelemetry/api"; import {isAsyncIterable, YogaInitialContext} from "graphql-yoga"; -import {sanitiseDocument} from "./utils"; -import { print } from "graphql"; -import {ExecutionResult} from "@graphql-tools/utils"; +import {addTraceId, sanitiseDocument} from "./utils"; +import {print} from "graphql"; +import {ATTR_GRAPHQL_DOCUMENT, ATTR_GRAPHQL_ERROR_COUNT, ATTR_GRAPHQL_OPERATION_NAME, ATTR_GRAPHQL_OPERATION_TYPE} from "./attributes"; export type OpenTelemetryGatewayPluginOptions = { /** @@ -42,31 +41,39 @@ export type OpenTelemetryGatewayPluginOptions = { */ subgraphExecute?: boolean | undefined; }; + attributes?: { + document: boolean | undefined; + operationName: boolean | undefined; + operationType: boolean | undefined; + } }; // + +const commonAttributes = Symbol(); + export interface OtelContext{ otel: { context: { active: api.Context }; }; + [commonAttributes]: Attributes; } + + export function useOpenTelemetry( - options: OpenTelemetryGatewayPluginOptions & { logger: Logger }, + options: OpenTelemetryGatewayPluginOptions, ): GatewayPlugin { const tracer = options.tracer || api.trace.getTracer('graphql-gateway'); options.spans ||= { validate: true, parse: true, execute: true, subscribe: true, subgraphExecute: true} + options.attributes ||= { document: true, operationName: true, operationType: true }; return { // TODO: on request / on response graphql.request onParse: ({ context, extendContext, parseFn, setParseFn }) => { - const span = tracer.startSpan("graphql.parse", { - attributes: { - [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", - } - }, api.context.active()) + const span = tracer.startSpan("graphql.parse", {}, api.context.active()) extendContext({ otel: { @@ -80,8 +87,20 @@ export function useOpenTelemetry( return api.context.with(api.trace.setSpan(getActiveContext(context), span), () => parseFn(args)) }) - return ({result}) => { - span.setAttribute(ATTR_GRAPHQL_DOCUMENT, print(sanitiseDocument(result))) + return ({result, extendContext, context}) => { + const sanitisedDocument = print(sanitiseDocument(result)); + extendContext({ + ...context, + [commonAttributes]: { + ...(options.attributes?.document && {[ATTR_GRAPHQL_DOCUMENT]: sanitisedDocument}), + ...(options.attributes?.operationName && {[ATTR_GRAPHQL_OPERATION_NAME]: result.definitions?.[0].name.value || "anonymous"}), + ...(options.attributes?.operationType && {[ATTR_GRAPHQL_OPERATION_TYPE]: result.definitions?.[0].operation || "unknown"}), + } + }); + + + + span.setAttributes(context[commonAttributes] || {}); if (result instanceof Error) { span.setAttribute(ATTR_GRAPHQL_ERROR_COUNT, 1); @@ -92,22 +111,15 @@ export function useOpenTelemetry( span.end(); } }, - onValidate({extendContext, context, setValidationFn, validateFn, params}) { - const span = tracer.startSpan("graphql.validate", { - attributes: { - [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", - [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(params.documentAST)) - } - }, api.context.active()) - - + onValidate({extendContext, context, setValidationFn, validateFn}) { + const span = tracer.startSpan("graphql.validate", {attributes: context[commonAttributes]}, api.context.active()) extendContext({ otel: { context: { active: api.trace.setSpan(api.context.active(), span), } - } + }, }); setValidationFn((schema, documentAST, rules, options, typeInfo) =>{ @@ -130,12 +142,7 @@ export function useOpenTelemetry( } }, onContextBuilding({context, extendContext}) { - const span = tracer.startSpan("graphql.context-building", { - attributes: { - [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", - // [ATTR_GRAPHQL_DOCUMENT]: print(sanatiseDocument(context.params.documentAST)) TODO - } - }, api.context.active()) + const span = tracer.startSpan("graphql.context-building", {attributes: context[commonAttributes]}, api.context.active()) extendContext({ otel: { context: { @@ -149,12 +156,7 @@ export function useOpenTelemetry( } }, onExecute: ({ args: {contextValue: context}, extendContext, setExecuteFn, executeFn }) => { - const span = tracer.startSpan("graphql.execute", { - attributes: { - [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", - // [ATTR_GRAPHQL_DOCUMENT]: print(sanatiseDocument(context.params.documentAST)) TODO - } - }, api.context.active()) + const span = tracer.startSpan("graphql.execute", {attributes: context[commonAttributes]}, api.context.active()) setExecuteFn((args) =>{ return api.context.with(api.trace.setSpan(getActiveContext(context), span), () => executeFn(args)) @@ -198,12 +200,7 @@ export function useOpenTelemetry( } }, onSubscribe: ({ args: {contextValue: context}, extendContext, subscribeFn, setSubscribeFn }) => { - const span = tracer.startSpan("graphql.subscribe", { - attributes: { - [ATTR_OPERATION_NAME]: context.params.operationName || "anonymous", - // [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(context.params.query)) TODO - } - }, api.context.active()) + const span = tracer.startSpan("graphql.subscribe", {attributes: context[commonAttributes]}, api.context.active()) setSubscribeFn((args) =>{ return api.context.with(api.trace.setSpan(getActiveContext(args.contextValue), span), () => subscribeFn(args)) @@ -236,12 +233,11 @@ export function useOpenTelemetry( onSubgraphExecute: ({executionRequest}) => { const span = tracer.startSpan("graphql.subgraph.execute", { attributes: { - [ATTR_OPERATION_NAME]: executionRequest.operationName, + [ATTR_GRAPHQL_OPERATION_NAME]: executionRequest.operationName, [ATTR_GRAPHQL_DOCUMENT]: print(sanitiseDocument(executionRequest.document)) } }, api.context.active()); - return ({ result }) => { if (isAsyncIterable(result)) { return { @@ -258,18 +254,4 @@ export function useOpenTelemetry( } } -const ATTR_GRAPHQL_ERROR_COUNT = "graphql.errors.count" -const ATTR_GRAPHQL_DOCUMENT = "graphql.document" -const ATTR_OPERATION_NAME = "graphql.operation" - const getActiveContext = (context: Partial & YogaInitialContext | undefined) => context?.otel?.context.active || api.context.active(); - -const addTraceId = (context: api.Context, result: ExecutionResult): ExecutionResult => { - return { - ...result, - extensions: { - ...result.extensions, - trace_id: api.trace.getSpan(context)?.spanContext().traceId, - }, - }; -}; diff --git a/packages/plugins/opentelemetry/src/utils.ts b/packages/plugins/opentelemetry/src/utils.ts index a67e39489..2bfa0a305 100644 --- a/packages/plugins/opentelemetry/src/utils.ts +++ b/packages/plugins/opentelemetry/src/utils.ts @@ -1,4 +1,6 @@ import {ASTNode, DocumentNode, Kind, visit} from "graphql/index"; +import * as api from "@opentelemetry/api"; +import {ExecutionResult} from "@graphql-tools/utils"; export const sanitiseDocument = (doc: DocumentNode): DocumentNode => { @@ -30,3 +32,14 @@ export const sanitiseDocument = (doc: DocumentNode): DocumentNode => { }, }); }; + + +export const addTraceId = (context: api.Context, result: ExecutionResult): ExecutionResult => { + return { + ...result, + extensions: { + ...result.extensions, + trace_id: api.trace.getSpan(context)?.spanContext().traceId, + }, + }; +}; diff --git a/packages/plugins/opentelemetry/tests/useOpenTelemetry.spec.ts b/packages/plugins/opentelemetry/tests/useOpenTelemetry.spec.ts index afe32f5a5..f1084a81e 100644 --- a/packages/plugins/opentelemetry/tests/useOpenTelemetry.spec.ts +++ b/packages/plugins/opentelemetry/tests/useOpenTelemetry.spec.ts @@ -1,131 +1,124 @@ -import { createSchema, createYoga } from 'graphql-yoga'; -import { beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { describe, expect, it } from 'vitest'; +import {createSchema, createYoga, Repeater} from "graphql-yoga"; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, + SpanExporter +} from "@opentelemetry/sdk-trace-base"; +import {buildHTTPExecutor} from "@graphql-tools/executor-http"; +import {useOpenTelemetry} from "@graphql-mesh/plugin-opentelemetry"; +import {GraphQLError, parse} from "graphql"; +import * as api from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; +import {ATTR_GRAPHQL_DOCUMENT, ATTR_GRAPHQL_OPERATION_NAME} from "../src/attributes"; +import {ATTR_GRAPHQL_OPERATION_TYPE} from "@opentelemetry/semantic-conventions/incubating"; -let mockModule = vi.mock; -if (globalThis.Bun) { - mockModule = require('bun:test').mock.module; -} -const mockRegisterProvider = vi.fn(); -describe('useOpenTelemetry', () => { - mockModule('@opentelemetry/sdk-trace-web', () => ({ - WebTracerProvider: vi.fn(() => ({ register: mockRegisterProvider })), - })); +const contextManager = new AsyncLocalStorageContextManager().enable(); +api.context.setGlobalContextManager(contextManager); - let gw: typeof import('../../../runtime/src'); - beforeAll(async () => { - gw = await import('../../../runtime/src'); - }); - beforeEach(() => { - vi.clearAllMocks(); - }); - describe('when not passing a custom provider', () => { - it('initializes and starts a new provider', async () => { - const { useOpenTelemetry } = await import('../src'); - await using upstream = createYoga({ - schema: createSchema({ - typeDefs: /* GraphQL */ ` +describe('useOpenTelemetry', () => { + const schema = createSchema({ + typeDefs: /* GraphQL */ ` type Query { - hello: String + ping: String + echo(message: String): String + error: String + context: String + } + + type Subscription { + counter(count: Int!): Int! } - `, - resolvers: { + `, + resolvers: { Query: { - hello: () => 'World', + ping: () => { + expect(api.context.active()).not.toEqual(api.ROOT_CONTEXT); // proves that the context is propagated + return 'pong'; + }, + echo: (_, { message }) => { + expect(api.context.active()).not.toEqual(api.ROOT_CONTEXT); + return `echo: ${message}`; + }, + error: () => { + throw new GraphQLError('boom'); + }, + }, + Subscription: { + counter: { + subscribe: (_, args) => { + expect(api.context.active()).not.toEqual(api.ROOT_CONTEXT); + return new Repeater((push, end) => { + for (let i = args.count; i >= 0; i--) { + push({ counter: i }); + } + end(); + }); + }, + }, }, - }, - }), - logging: false, - }); - - await using gateway = gw.createGatewayRuntime({ - proxy: { - endpoint: 'https://example.com/graphql', }, - plugins: (ctx) => [ - gw.useCustomFetch( - // @ts-expect-error TODO: MeshFetch is not compatible with @whatwg-node/server fetch - upstream.fetch, - ), - useOpenTelemetry({ - exporters: [], - ...ctx, - }), - ], - logging: false, - }); + }); - const response = await gateway.fetch('http://localhost:4000/graphql', { - method: 'POST', - headers: { - 'content-type': 'application/json', - }, - body: JSON.stringify({ - query: /* GraphQL */ ` - query { - hello + const useTestOpenTracing = ( + exporter: SpanExporter, + ) => { + const provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)] + }); + + provider.register(); + return useOpenTelemetry({ + tracer: provider.getTracer("graphql"), + spans: { + parse: true, + validate: true, + execute: true, + subscribe: true, + subgraphExecute: true, } - `, - }), - }); + }); + }; - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.data?.hello).toBe('World'); - expect(mockRegisterProvider).toHaveBeenCalledTimes(1); - }); - }); + const createTestInstance = (exporter: SpanExporter) => { + const yoga = createYoga({ + schema, + plugins: [useTestOpenTracing(exporter)], + }); - describe('when passing a custom provider', () => { - it('does not initialize a new provider and does not start the provided provider instance', async () => { - const { useOpenTelemetry } = await import('../src'); - await using upstream = createYoga({ - schema: createSchema({ - typeDefs: /* GraphQL */ ` - type Query { - hello: String - } - `, - resolvers: { - Query: { - hello: () => 'World', - }, - }, - }), - logging: false, - }); + return buildHTTPExecutor({ + fetch: yoga.fetch, + }); + }; - await using gateway = gw.createGatewayRuntime({ - proxy: { - endpoint: 'https://example.com/graphql', - }, - plugins: (ctx) => [ - gw.useCustomFetch( - // @ts-expect-error TODO: MeshFetch is not compatible with @whatwg-node/server fetch - upstream.fetch, - ), - useOpenTelemetry({ initializeNodeSDK: false, ...ctx }), - ], - logging: false, - }); + it('query should add spans', async () => { + const exporter = new InMemorySpanExporter(); + const executor = createTestInstance(exporter); - const response = await gateway.fetch('http://localhost:4000/graphql', { - method: 'POST', - headers: { - 'content-type': 'application/json', - }, - body: JSON.stringify({ - query: /* GraphQL */ ` - query { - hello - } - `, - }), - }); + await executor({ document: parse(`query ping { ping }`) }); - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.data?.hello).toBe('World'); - expect(mockRegisterProvider).not.toHaveBeenCalled(); + const actual = exporter.getFinishedSpans(); + expect(actual.length).toBe(4); + expect(actual?.[0]?.name).toBe('graphql.parse'); + expect(actual?.[1]?.name).toBe('graphql.validate'); + expect(actual?.[2]?.name).toBe('graphql.context-building'); + expect(actual?.[3]?.name).toBe('graphql.execute'); }); - }); + + it('query should add attributes', async () => { + const exporter = new InMemorySpanExporter(); + const executor = createTestInstance(exporter); + + await executor({ document: parse(`query ping { ping }`) }); + + const actual = exporter.getFinishedSpans(); + expect(actual.length).toBe(4); + expect(actual?.[3]?.attributes).toEqual({ + [ATTR_GRAPHQL_DOCUMENT]: "query ping {\n ping\n}", + [ATTR_GRAPHQL_OPERATION_NAME]: "ping", + [ATTR_GRAPHQL_OPERATION_TYPE]: "query" + }) + }); + }); diff --git a/yarn.lock b/yarn.lock index 6de74db31..55b2238de 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3496,6 +3496,7 @@ __metadata: "@graphql-mesh/utils": "npm:^0.103.6" "@graphql-tools/utils": "npm:^10.6.2" "@opentelemetry/api": "npm:^1.9.0" + "@opentelemetry/context-async-hooks": "npm:^1.29.0" "@opentelemetry/exporter-trace-otlp-grpc": "npm:^0.56.0" "@opentelemetry/exporter-trace-otlp-http": "npm:^0.56.0" "@opentelemetry/exporter-zipkin": "npm:^1.29.0" @@ -5034,6 +5035,15 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/context-async-hooks@npm:^1.29.0": + version: 1.29.0 + resolution: "@opentelemetry/context-async-hooks@npm:1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/f7b5c6b4cad60021a0f7815016fda1b4b8d364348ecfa7e04fe07dfe9af90caaf4065fa5f9169a65f28b71aaf961672eed3849c42cd6484a9051dec0e5c9de5c + languageName: node + linkType: hard + "@opentelemetry/core@npm:1.26.0": version: 1.26.0 resolution: "@opentelemetry/core@npm:1.26.0" From 22b03f80c78179bc7de25ef30be608d5931f1583 Mon Sep 17 00:00:00 2001 From: Darren Date: Fri, 13 Dec 2024 12:16:43 +0000 Subject: [PATCH 3/4] add server setup --- packages/plugins/opentelemetry/package.json | 4 + packages/plugins/opentelemetry/src/plugin.ts | 1 - packages/plugins/opentelemetry/src/setup.ts | 67 ++++++++++++++ yarn.lock | 91 +++++++++++++++++++- 4 files changed, 160 insertions(+), 3 deletions(-) create mode 100644 packages/plugins/opentelemetry/src/setup.ts diff --git a/packages/plugins/opentelemetry/package.json b/packages/plugins/opentelemetry/package.json index 42fda0291..4903a41a0 100644 --- a/packages/plugins/opentelemetry/package.json +++ b/packages/plugins/opentelemetry/package.json @@ -55,8 +55,12 @@ "@opentelemetry/exporter-trace-otlp-http": "^0.56.0", "@opentelemetry/exporter-zipkin": "^1.29.0", "@opentelemetry/instrumentation": "^0.56.0", + "@opentelemetry/instrumentation-dns": "^0.42.0", + "@opentelemetry/instrumentation-grpc": "^0.56.0", + "@opentelemetry/instrumentation-http": "^0.56.0", "@opentelemetry/resources": "^1.29.0", "@opentelemetry/sdk-trace-base": "^1.29.0", + "@opentelemetry/sdk-trace-node": "^1.29.0", "@opentelemetry/sdk-trace-web": "^1.29.0", "@opentelemetry/semantic-conventions": "^1.28.0", "@whatwg-node/disposablestack": "^0.0.5", diff --git a/packages/plugins/opentelemetry/src/plugin.ts b/packages/plugins/opentelemetry/src/plugin.ts index 777d8099b..cc3750d04 100644 --- a/packages/plugins/opentelemetry/src/plugin.ts +++ b/packages/plugins/opentelemetry/src/plugin.ts @@ -71,7 +71,6 @@ export function useOpenTelemetry( options.attributes ||= { document: true, operationName: true, operationType: true }; return { - // TODO: on request / on response graphql.request onParse: ({ context, extendContext, parseFn, setParseFn }) => { const span = tracer.startSpan("graphql.parse", {}, api.context.active()) diff --git a/packages/plugins/opentelemetry/src/setup.ts b/packages/plugins/opentelemetry/src/setup.ts new file mode 100644 index 000000000..3d758a128 --- /dev/null +++ b/packages/plugins/opentelemetry/src/setup.ts @@ -0,0 +1,67 @@ +// This must be run first. Node uses patching on implementations to inject telemetry so must run before the packages +// are instantiated. +import * as api from '@opentelemetry/api'; +import { propagation } from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; +import { CompositePropagator, W3CTraceContextPropagator } from '@opentelemetry/core'; +import { registerInstrumentations } from '@opentelemetry/instrumentation'; +import { DnsInstrumentation } from '@opentelemetry/instrumentation-dns'; +import { GrpcInstrumentation } from '@opentelemetry/instrumentation-grpc'; +import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'; +import { SDK_INFO } from '@opentelemetry/core'; +import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; +import {Resource} from "@opentelemetry/resources"; +import { + ATTR_SERVICE_NAME, + ATTR_TELEMETRY_SDK_LANGUAGE, ATTR_TELEMETRY_SDK_VERSION +} from "@opentelemetry/semantic-conventions"; +import {OTLPTraceExporter} from "@opentelemetry/exporter-trace-otlp-grpc"; + +const contextManager = new AsyncLocalStorageContextManager().enable(); +api.context.setGlobalContextManager(contextManager); + +const exporter = new OTLPTraceExporter({ + url: "todo:configuration", +}) + +const provider = new NodeTracerProvider({ + resource: new Resource({ + [ATTR_SERVICE_NAME]: "hive-gateway", + // [ATTR_SERVICE_VERSION]: 1, + [ATTR_TELEMETRY_SDK_LANGUAGE]: SDK_INFO[ATTR_TELEMETRY_SDK_LANGUAGE], + [ATTR_TELEMETRY_SDK_VERSION]: SDK_INFO[ATTR_TELEMETRY_SDK_VERSION], + }), + spanProcessors: [ + new BatchSpanProcessor(exporter, { + maxQueueSize: 8192, + maxExportBatchSize: 512, + scheduledDelayMillis: 100, + exportTimeoutMillis: 30_000, + }), + ] +}); + +propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [new W3CTraceContextPropagator()], + }), +); + +api.trace.setGlobalTracerProvider(provider); + +provider.register(); + +registerInstrumentations({ + tracerProvider: provider, + instrumentations: [ + new DnsInstrumentation(), + new HttpInstrumentation({}), + new GrpcInstrumentation(), + ], +}); + +for (const signal of ['SIGINT', 'SIGTERM']) { + // eslint-disable-next-line no-console + process.on(signal, () => provider.shutdown().catch(console.error)); +} diff --git a/yarn.lock b/yarn.lock index 55b2238de..a7d77ee03 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3501,8 +3501,12 @@ __metadata: "@opentelemetry/exporter-trace-otlp-http": "npm:^0.56.0" "@opentelemetry/exporter-zipkin": "npm:^1.29.0" "@opentelemetry/instrumentation": "npm:^0.56.0" + "@opentelemetry/instrumentation-dns": "npm:^0.42.0" + "@opentelemetry/instrumentation-grpc": "npm:^0.56.0" + "@opentelemetry/instrumentation-http": "npm:^0.56.0" "@opentelemetry/resources": "npm:^1.29.0" "@opentelemetry/sdk-trace-base": "npm:^1.29.0" + "@opentelemetry/sdk-trace-node": "npm:^1.29.0" "@opentelemetry/sdk-trace-web": "npm:^1.29.0" "@opentelemetry/semantic-conventions": "npm:^1.28.0" "@whatwg-node/disposablestack": "npm:^0.0.5" @@ -5035,7 +5039,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/context-async-hooks@npm:^1.29.0": +"@opentelemetry/context-async-hooks@npm:1.29.0, @opentelemetry/context-async-hooks@npm:^1.29.0": version: 1.29.0 resolution: "@opentelemetry/context-async-hooks@npm:1.29.0" peerDependencies: @@ -5126,7 +5130,45 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/instrumentation@npm:^0.56.0": +"@opentelemetry/instrumentation-dns@npm:^0.42.0": + version: 0.42.0 + resolution: "@opentelemetry/instrumentation-dns@npm:0.42.0" + dependencies: + "@opentelemetry/instrumentation": "npm:^0.56.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/7d5dfd8cfb53475feea5922f9e5c5ffa2c41da9a43ec779b58e125432a2e85c1b6c105ae6b0dd1950825f93b4429cbdb31b73bbc709dba0c64d255261dccee8e + languageName: node + linkType: hard + +"@opentelemetry/instrumentation-grpc@npm:^0.56.0": + version: 0.56.0 + resolution: "@opentelemetry/instrumentation-grpc@npm:0.56.0" + dependencies: + "@opentelemetry/instrumentation": "npm:0.56.0" + "@opentelemetry/semantic-conventions": "npm:1.28.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/7caf1321d3e177d98ef4575f53fcf89d013b65d2935e399f87aa8ca65e91fdebe71292a05cb0baa4653a9c328ce51b843dc9d56daf572401019e3094f9737472 + languageName: node + linkType: hard + +"@opentelemetry/instrumentation-http@npm:^0.56.0": + version: 0.56.0 + resolution: "@opentelemetry/instrumentation-http@npm:0.56.0" + dependencies: + "@opentelemetry/core": "npm:1.29.0" + "@opentelemetry/instrumentation": "npm:0.56.0" + "@opentelemetry/semantic-conventions": "npm:1.28.0" + forwarded-parse: "npm:2.1.2" + semver: "npm:^7.5.2" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/b225252476ad049f888b45e03c4b2ce3ccb68e0b226049090eefa4e20b3859abfb84daede4562f75d2c3820a1351400ee996cf89e2665a91209e920d711f2e49 + languageName: node + linkType: hard + +"@opentelemetry/instrumentation@npm:0.56.0, @opentelemetry/instrumentation@npm:^0.56.0": version: 0.56.0 resolution: "@opentelemetry/instrumentation@npm:0.56.0" dependencies: @@ -5197,6 +5239,28 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/propagator-b3@npm:1.29.0": + version: 1.29.0 + resolution: "@opentelemetry/propagator-b3@npm:1.29.0" + dependencies: + "@opentelemetry/core": "npm:1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/deb04f14906ec72f6eb8e2dffeb836450c35432732776fef40413e56c8738dd6c49c4d084be04fdedc45b3670dc0e94af2808780cf8921305d57c22cc0f24891 + languageName: node + linkType: hard + +"@opentelemetry/propagator-jaeger@npm:1.29.0": + version: 1.29.0 + resolution: "@opentelemetry/propagator-jaeger@npm:1.29.0" + dependencies: + "@opentelemetry/core": "npm:1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/9d91968ed88d4946d1b7a03cec774be8569ce9cc8a21f735c4c3f044baa4bb7c5732cfb3caf0c03266258dd9df7abaf329f7d79221827cdcd287b9824f0079df + languageName: node + linkType: hard + "@opentelemetry/resources@npm:1.29.0": version: 1.29.0 resolution: "@opentelemetry/resources@npm:1.29.0" @@ -5272,6 +5336,22 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/sdk-trace-node@npm:^1.29.0": + version: 1.29.0 + resolution: "@opentelemetry/sdk-trace-node@npm:1.29.0" + dependencies: + "@opentelemetry/context-async-hooks": "npm:1.29.0" + "@opentelemetry/core": "npm:1.29.0" + "@opentelemetry/propagator-b3": "npm:1.29.0" + "@opentelemetry/propagator-jaeger": "npm:1.29.0" + "@opentelemetry/sdk-trace-base": "npm:1.29.0" + semver: "npm:^7.5.2" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/0201340e451fc3a3969df8c5d405706d6b454c28ddb76bb99fce20b58a53f47fee94bb7134ff1591ddf322a48c7641aeb11e58bebcdf82e933ceb85a54157d46 + languageName: node + linkType: hard + "@opentelemetry/sdk-trace-web@npm:^1.29.0": version: 1.29.0 resolution: "@opentelemetry/sdk-trace-web@npm:1.29.0" @@ -10259,6 +10339,13 @@ __metadata: languageName: node linkType: hard +"forwarded-parse@npm:2.1.2": + version: 2.1.2 + resolution: "forwarded-parse@npm:2.1.2" + checksum: 10c0/0c6b4c631775f272b4475e935108635495e8a5b261d1b4a5caef31c47c5a0b04134adc564e655aadfef366a02647fa3ae90a1d3ac19929f3ade47f9bed53036a + languageName: node + linkType: hard + "forwarded@npm:0.2.0": version: 0.2.0 resolution: "forwarded@npm:0.2.0" From c0ef5d557cbf05390386a8b5811449f8aa98057b Mon Sep 17 00:00:00 2001 From: Darren Date: Fri, 13 Dec 2024 13:16:30 +0000 Subject: [PATCH 4/4] push commented code --- packages/plugins/opentelemetry/src/plugin.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/plugins/opentelemetry/src/plugin.ts b/packages/plugins/opentelemetry/src/plugin.ts index cc3750d04..a46434538 100644 --- a/packages/plugins/opentelemetry/src/plugin.ts +++ b/packages/plugins/opentelemetry/src/plugin.ts @@ -229,7 +229,7 @@ export function useOpenTelemetry( }, } }, - onSubgraphExecute: ({executionRequest}) => { + onSubgraphExecute: ({executionRequest, setExecutor, executor}) => { const span = tracer.startSpan("graphql.subgraph.execute", { attributes: { [ATTR_GRAPHQL_OPERATION_NAME]: executionRequest.operationName, @@ -237,6 +237,10 @@ export function useOpenTelemetry( } }, api.context.active()); + // setExecutor((args ) => { + // return api.context.with(api.trace.setSpan(getActiveContext(args.context), span), () => executor(args)) + // }) + return ({ result }) => { if (isAsyncIterable(result)) { return {