diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 38d9b83974d..fa0d69b5026 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -81,6 +81,7 @@ All notable changes to experimental packages in this project will be documented * fix(sdk-node): use resource interface instead of concrete class [#3803](https://github.com/open-telemetry/opentelemetry-js/pull/3803) @blumamir * fix(sdk-logs): remove includeTraceContext configuration and use LogRecord context when available [#3817](https://github.com/open-telemetry/opentelemetry-js/pull/3817) @hectorhdzg +* fix(instrumentation-grpc): instrument @grpc/grpc-js Client methods [#3804](https://github.com/open-telemetry/opentelemetry-js/pull/3804) @pichlermarc ## 0.39.1 diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/.eslintignore b/experimental/packages/opentelemetry-instrumentation-grpc/.eslintignore index 378eac25d31..cd163f08bef 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/.eslintignore +++ b/experimental/packages/opentelemetry-instrumentation-grpc/.eslintignore @@ -1 +1,2 @@ build +test/proto diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/package.json b/experimental/packages/opentelemetry-instrumentation-grpc/package.json index 51b63587597..f3e705dd626 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/package.json +++ b/experimental/packages/opentelemetry-instrumentation-grpc/package.json @@ -9,7 +9,7 @@ "prepublishOnly": "npm run compile", "compile": "tsc --build", "clean": "tsc --build --clean", - "test": "nyc ts-mocha -p tsconfig.json test/**/*.test.ts", + "test": "npm run protos:generate && nyc ts-mocha -p tsconfig.json test/**/*.test.ts", "tdd": "npm run test -- --watch-extensions ts --watch", "lint": "eslint . --ext .ts", "lint:fix": "eslint . --ext .ts --fix", @@ -18,7 +18,8 @@ "watch": "tsc --build --watch", "precompile": "cross-var lerna run version --scope $npm_package_name --include-dependencies", "prewatch": "node ../../../scripts/version-update.js", - "peer-api-check": "node ../../../scripts/peer-api-check.js" + "peer-api-check": "node ../../../scripts/peer-api-check.js", + "protos:generate": "cd test/fixtures && buf generate" }, "keywords": [ "opentelemetry", @@ -45,6 +46,10 @@ "access": "public" }, "devDependencies": { + "@bufbuild/buf": "1.21.0-1", + "@protobuf-ts/grpc-transport": "2.9.0", + "@protobuf-ts/runtime-rpc": "2.9.0", + "@protobuf-ts/runtime": "2.9.0", "@grpc/grpc-js": "^1.7.1", "@grpc/proto-loader": "^0.7.3", "@opentelemetry/api": "1.4.1", diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts index c61edf771b8..9e26062d1c8 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts @@ -22,9 +22,8 @@ import type { GrpcJsInstrumentation } from './'; import type { GrpcClientFunc, SendUnaryDataCallback } from './types'; import type { metadataCaptureType } from '../internal-types'; -import { SpanStatusCode, propagation, context } from '@opentelemetry/api'; +import { propagation, context } from '@opentelemetry/api'; import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; -import { CALL_SPAN_ENDED } from './serverUtils'; import { AttributeNames } from '../enums/AttributeNames'; import { GRPC_STATUS_CODE_OK } from '../status-code'; import { @@ -32,6 +31,7 @@ import { _grpcStatusCodeToOpenTelemetryStatusCode, _methodIsIgnored, } from '../utils'; +import { errorMonitor } from 'events'; /** * Parse a package method list and return a list of methods to patch @@ -63,6 +63,91 @@ export function getMethodsToWrap( return methodList; } +/** + * Patches a callback so that the current span for this trace is also ended + * when the callback is invoked. + */ +export function patchedCallback( + span: Span, + callback: SendUnaryDataCallback +) { + const wrappedFn: SendUnaryDataCallback = ( + err: grpcJs.ServiceError | null, + res?: ResponseType + ) => { + if (err) { + if (err.code) { + span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); + span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code); + } + span.setAttributes({ + [AttributeNames.GRPC_ERROR_NAME]: err.name, + [AttributeNames.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setAttribute( + SemanticAttributes.RPC_GRPC_STATUS_CODE, + GRPC_STATUS_CODE_OK + ); + } + + span.end(); + callback(err, res); + }; + return context.bind(context.active(), wrappedFn); +} + +export function patchResponseMetadataEvent( + span: Span, + call: EventEmitter, + metadataCapture: metadataCaptureType +) { + call.on('metadata', (responseMetadata: any) => { + metadataCapture.client.captureResponseMetadata(span, responseMetadata); + }); +} + +export function patchResponseStreamEvents(span: Span, call: EventEmitter) { + // Both error and status events can be emitted + // the first one emitted set spanEnded to true + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + span.end(); + spanEnded = true; + } + }; + context.bind(context.active(), call); + call.on(errorMonitor, (err: ServiceError) => { + if (spanEnded) { + return; + } + + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttributes({ + [AttributeNames.GRPC_ERROR_NAME]: err.name, + [AttributeNames.GRPC_ERROR_MESSAGE]: err.message, + [SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code, + }); + + endSpan(); + }); + + call.on('status', (status: SpanStatus) => { + if (spanEnded) { + return; + } + + span.setStatus(_grpcStatusCodeToSpanStatus(status.code)); + span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code); + + endSpan(); + }); +} + /** * Execute grpc client call. Apply completitionspan properties and end the * span on callback or receiving an emitted event. @@ -71,44 +156,9 @@ export function makeGrpcClientRemoteCall( metadataCapture: metadataCaptureType, original: GrpcClientFunc, args: unknown[], - metadata: Metadata, - self: Client + metadata: grpcJs.Metadata, + self: grpcJs.Client ): (span: Span) => EventEmitter { - /** - * Patches a callback so that the current span for this trace is also ended - * when the callback is invoked. - */ - function patchedCallback( - span: Span, - callback: SendUnaryDataCallback - ) { - const wrappedFn: SendUnaryDataCallback = ( - err: ServiceError | null, - res?: ResponseType - ) => { - if (err) { - if (err.code) { - span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); - span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code); - } - span.setAttributes({ - [AttributeNames.GRPC_ERROR_NAME]: err.name, - [AttributeNames.GRPC_ERROR_MESSAGE]: err.message, - }); - } else { - span.setStatus({ code: SpanStatusCode.UNSET }); - span.setAttribute( - SemanticAttributes.RPC_GRPC_STATUS_CODE, - GRPC_STATUS_CODE_OK - ); - } - - span.end(); - callback(err, res); - }; - return context.bind(context.active(), wrappedFn); - } - return (span: Span) => { // if unary or clientStream if (!original.responseStream) { @@ -132,67 +182,18 @@ export function makeGrpcClientRemoteCall( // if server stream or bidi if (original.responseStream) { - // Both error and status events can be emitted - // the first one emitted set spanEnded to true - let spanEnded = false; - const endSpan = () => { - if (!spanEnded) { - span.end(); - spanEnded = true; - } - }; - context.bind(context.active(), call); - call.on('error', (err: ServiceError) => { - if (call[CALL_SPAN_ENDED]) { - return; - } - call[CALL_SPAN_ENDED] = true; - - span.setStatus({ - code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), - message: err.message, - }); - span.setAttributes({ - [AttributeNames.GRPC_ERROR_NAME]: err.name, - [AttributeNames.GRPC_ERROR_MESSAGE]: err.message, - [SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code, - }); - - endSpan(); - }); - - call.on('status', (status: SpanStatus) => { - if (call[CALL_SPAN_ENDED]) { - return; - } - call[CALL_SPAN_ENDED] = true; - - span.setStatus(_grpcStatusCodeToSpanStatus(status.code)); - span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code); - - endSpan(); - }); + patchResponseStreamEvents(span, call); } return call; }; } -/** - * Returns the metadata argument from user provided arguments (`args`) - */ -export function getMetadata( - this: GrpcJsInstrumentation, - original: GrpcClientFunc, - grpcClient: typeof grpcJs, - args: Array -): Metadata { - let metadata: Metadata; - +export function getMetadataIndex(args: Array): number { // This finds an instance of Metadata among the arguments. // A possible issue that could occur is if the 'options' parameter from // the user contains an '_internal_repr' as well as a 'getMap' function, // but this is an extremely rare case. - let metadataIndex = args.findIndex((arg: unknown | Metadata) => { + return args.findIndex((arg: unknown | Metadata) => { return ( arg && typeof arg === 'object' && @@ -200,22 +201,45 @@ export function getMetadata( typeof (arg as Metadata).getMap === 'function' ); }); +} + +/** + * Returns the metadata argument from user provided arguments (`args`) + * If no metadata is provided in `args`: adds empty metadata to `args` and returns that empty metadata + */ +export function extractMetadataOrSplice( + grpcLib: typeof grpcJs, + args: Array, + spliceIndex: number +) { + let metadata: grpcJs.Metadata; + const metadataIndex = getMetadataIndex(args); if (metadataIndex === -1) { - metadata = new grpcClient.Metadata(); - if (!original.requestStream) { - // unary or server stream - metadataIndex = 1; - } else { - // client stream or bidi - metadataIndex = 0; - } - args.splice(metadataIndex, 0, metadata); + // Create metadata if it does not exist + metadata = new grpcLib.Metadata(); + args.splice(spliceIndex, 0, metadata); } else { metadata = args[metadataIndex] as Metadata; } return metadata; } +/** + * Returns the metadata argument from user provided arguments (`args`) + * Adds empty metadata to arguments if the default is used. + */ +export function extractMetadataOrSpliceDefault( + grpcClient: typeof grpcJs, + original: GrpcClientFunc, + args: Array +): grpcJs.Metadata { + return extractMetadataOrSplice( + grpcClient, + args, + original.requestStream ? 0 : 1 + ); +} + /** * Inject opentelemetry trace context into `metadata` for use by another * grpc receiver diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts index 72ad45e3bd2..36be79fd074 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts @@ -26,6 +26,7 @@ import type { loadPackageDefinition, GrpcObject, } from '@grpc/grpc-js'; + import type * as grpcJs from '@grpc/grpc-js'; import type { @@ -36,6 +37,7 @@ import type { MakeClientConstructorFunction, PackageDefinition, GrpcClientFunc, + ClientRequestFunction, } from './types'; import type { GrpcInstrumentationConfig } from '../types'; import type { metadataCaptureType } from '../internal-types'; @@ -47,6 +49,7 @@ import { SpanOptions, SpanKind, trace, + Span, } from '@opentelemetry/api'; import { InstrumentationNodeModuleDefinition, @@ -63,9 +66,19 @@ import { import { getMethodsToWrap, makeGrpcClientRemoteCall, - getMetadata, + extractMetadataOrSpliceDefault, + setSpanContext, + patchedCallback, + patchResponseStreamEvents, + patchResponseMetadataEvent, + extractMetadataOrSplice, } from './clientUtils'; -import { _extractMethodAndService, metadataCapture, URI_REGEX } from '../utils'; +import { + _extractMethodAndService, + metadataCapture, + URI_REGEX, + _methodIsIgnored, +} from '../utils'; import { AttributeValues } from '../enums/AttributeValues'; export class GrpcJsInstrumentation extends InstrumentationBase { @@ -121,6 +134,41 @@ export class GrpcJsInstrumentation extends InstrumentationBase { 'loadPackageDefinition', this._patchLoadPackageDefinition(moduleExports) ); + if (isWrapped(moduleExports.Client.prototype)) { + this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); + this._unwrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeBidiStreamRequest' + ); + } + this._wrap( + moduleExports.Client.prototype, + 'makeUnaryRequest', + this._patchClientRequestMethod(moduleExports, false) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest', + this._patchClientRequestMethod(moduleExports, false) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest', + this._patchClientRequestMethod(moduleExports, true) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeBidiStreamRequest', + this._patchClientRequestMethod(moduleExports, true) as any + ); return moduleExports; }, (moduleExports, version) => { @@ -131,6 +179,16 @@ export class GrpcJsInstrumentation extends InstrumentationBase { this._unwrap(moduleExports, 'makeClientConstructor'); this._unwrap(moduleExports, 'makeGenericClientConstructor'); this._unwrap(moduleExports, 'loadPackageDefinition'); + this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); + this._unwrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest' + ); + this._unwrap(moduleExports.Client.prototype, 'makeBidiStreamRequest'); } ), ]; @@ -257,6 +315,84 @@ export class GrpcJsInstrumentation extends InstrumentationBase { }; } + /** + * Patch for grpc.Client.make*Request(...) functions. + * Provides auto-instrumentation for client requests when using a Client without + * makeGenericClientConstructor/makeClientConstructor + */ + private _patchClientRequestMethod( + grpcLib: typeof grpcJs, + hasResponseStream: boolean + ): ( + original: ClientRequestFunction + ) => ClientRequestFunction { + const instrumentation = this; + return (original: ClientRequestFunction) => { + instrumentation._diag.debug( + 'patched makeClientStreamRequest on grpc client' + ); + + return function makeClientStreamRequest(this: grpcJs.Client) { + // method must always be at first position + const method = arguments[0]; + const { name, service, methodAttributeValue } = + instrumentation._splitMethodString(method); + + // Do not attempt to trace/inject context if method is ignored + if ( + method != null && + _methodIsIgnored( + methodAttributeValue, + instrumentation.getConfig().ignoreGrpcMethods + ) + ) { + return original.apply(this, [...arguments]); + } + + const modifiedArgs = [...arguments]; + const metadata = extractMetadataOrSplice(grpcLib, modifiedArgs, 4); + + const span = instrumentation.createClientSpan( + name, + methodAttributeValue, + service, + metadata + ); + instrumentation.extractNetMetadata(this, span); + + // Callback is only present when there is no responseStream + if (!hasResponseStream) { + // Replace the callback with the patched one if it is there. + // If the callback arg is not a function on the last position then the client will throw + // and never call the callback -> so there's nothing to patch + const lastArgIndex = modifiedArgs.length - 1; + const callback = modifiedArgs[lastArgIndex]; + if (typeof callback === 'function') { + modifiedArgs[lastArgIndex] = patchedCallback(span, callback); + } + } + + return context.with(trace.setSpan(context.active(), span), () => { + setSpanContext(metadata); + + const call = original.apply(this, [...modifiedArgs]); + patchResponseMetadataEvent( + span, + call, + instrumentation._metadataCapture + ); + + // Subscribe to response stream events when there's a response stream. + if (hasResponseStream) { + patchResponseStreamEvents(span, call); + } + + return call; + }); + }; + }; + } + /** * Entry point for applying client patches to `grpc.makeClientConstructor(...)` equivalents * @param this GrpcJsPlugin @@ -320,10 +456,10 @@ export class GrpcJsInstrumentation extends InstrumentationBase { function clientMethodTrace(this: Client) { const name = `grpc.${original.path.replace('/', '')}`; const args = [...arguments]; - const metadata = getMetadata.call( + const metadata = extractMetadataOrSpliceDefault.call( instrumentation, - original, grpcClient, + original, args ); const { service, method } = _extractMethodAndService(original.path); @@ -335,18 +471,7 @@ export class GrpcJsInstrumentation extends InstrumentationBase { [SemanticAttributes.RPC_METHOD]: method, [SemanticAttributes.RPC_SERVICE]: service, }); - // set net.peer.* from target (e.g., "dns:otel-productcatalogservice:8080") as a hint to APMs - const parsedUri = URI_REGEX.exec(this.getChannel().getTarget()); - if (parsedUri != null && parsedUri.groups != null) { - span.setAttribute( - SemanticAttributes.NET_PEER_NAME, - parsedUri.groups['name'] - ); - span.setAttribute( - SemanticAttributes.NET_PEER_PORT, - parseInt(parsedUri.groups['port']) - ); - } + instrumentation.extractNetMetadata(this, span); instrumentation._metadataCapture.client.captureRequestMetadata( span, @@ -368,6 +493,51 @@ export class GrpcJsInstrumentation extends InstrumentationBase { }; } + private _splitMethodString(method: string) { + if (method == null) { + return { name: '', service: '', methodAttributeValue: '' }; + } + const name = `grpc.${method.replace('/', '')}`; + const { service, method: methodAttributeValue } = + _extractMethodAndService(method); + return { name, service, methodAttributeValue }; + } + + private createClientSpan( + name: string, + methodAttributeValue: string, + service: string, + metadata?: grpcJs.Metadata + ) { + const span = this.tracer + .startSpan(name, { kind: SpanKind.CLIENT }) + .setAttributes({ + [SemanticAttributes.RPC_SYSTEM]: 'grpc', + [SemanticAttributes.RPC_METHOD]: methodAttributeValue, + [SemanticAttributes.RPC_SERVICE]: service, + }); + + if (metadata != null) { + this._metadataCapture.client.captureRequestMetadata(span, metadata); + } + return span; + } + + private extractNetMetadata(client: grpcJs.Client, span: Span) { + // set net.peer.* from target (e.g., "dns:otel-productcatalogservice:8080") as a hint to APMs + const parsedUri = URI_REGEX.exec(client.getChannel().getTarget()); + if (parsedUri != null && parsedUri.groups != null) { + span.setAttribute( + SemanticAttributes.NET_PEER_NAME, + parsedUri.groups['name'] + ); + span.setAttribute( + SemanticAttributes.NET_PEER_PORT, + parseInt(parsedUri.groups['port']) + ); + } + } + /** * Utility function to patch *all* functions loaded through a proto file. * Recursively searches for Client classes and patches all methods, reversing the diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts index e9f15b45ab0..06ca7c0b61c 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts @@ -64,6 +64,10 @@ export type GrpcClientFunc = ((...args: unknown[]) => GrpcEmitter) & { export type ServerRegisterFunction = typeof Server.prototype.register; +export type ClientRequestFunction = ( + ...args: unknown[] +) => ReturnType; + export type MakeClientConstructorFunction = typeof makeGenericClientConstructor; export type { HandleCall } from '@grpc/grpc-js/build/src/server-call'; diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/.gitignore b/experimental/packages/opentelemetry-instrumentation-grpc/test/.gitignore new file mode 100644 index 00000000000..c9438a302c0 --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/.gitignore @@ -0,0 +1 @@ +/proto diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.gen.yaml b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.gen.yaml new file mode 100644 index 00000000000..a0aba044f1d --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.gen.yaml @@ -0,0 +1,9 @@ +version: v1 +plugins: + - plugin: buf.build/community/timostamm-protobuf-ts:v2.9.0 + out: ../../test/proto/ts/fixtures + opt: + - long_type_string + - generate_dependencies + - ts_nocheck + diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.lock b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.lock new file mode 100644 index 00000000000..65d8f1f75c5 --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.lock @@ -0,0 +1,8 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: googleapis + repository: googleapis + commit: cc916c31859748a68fd229a3c8d7a2e8 + digest: shake256:469b049d0eb04203d5272062636c078decefc96fec69739159c25d85349c50c34c7706918a8b216c5c27f76939df48452148cff8c5c3ae77fa6ba5c25c1b8bf8 diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.yaml b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.yaml new file mode 100644 index 00000000000..dc977e439a7 --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/fixtures/buf.yaml @@ -0,0 +1,9 @@ +version: v1 +lint: + use: + - DEFAULT +breaking: + use: + - FILE +deps: + - buf.build/googleapis/googleapis diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-protobuf-ts.test.ts b/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-protobuf-ts.test.ts new file mode 100644 index 00000000000..8e238a6acc1 --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-protobuf-ts.test.ts @@ -0,0 +1,831 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { GrpcInstrumentation } from '../src'; + +const instrumentation = new GrpcInstrumentation(); +instrumentation.enable(); +instrumentation.disable(); + +import { GrpcTransport } from '@protobuf-ts/grpc-transport'; +import * as grpc from '@grpc/grpc-js'; +import { GrpcTesterClient } from './proto/ts/fixtures/grpc-test.client'; +import { + InMemorySpanExporter, + NodeTracerProvider, + SimpleSpanProcessor, +} from '@opentelemetry/sdk-trace-node'; +import * as protoLoader from '@grpc/proto-loader'; +import * as path from 'path'; +import * as assert from 'assert'; +import { + context, + ContextManager, + propagation, + SpanKind, + trace, +} from '@opentelemetry/api'; +import { W3CTraceContextPropagator } from '@opentelemetry/core'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; +import { startServer } from './helper'; +import { + assertExportedSpans, + assertNoSpansExported, + SpanAssertionFunction, + TestFunction, +} from './protobuf-ts-utils'; + +const memoryExporter = new InMemorySpanExporter(); +const PROTO_PATH = path.resolve(__dirname, './fixtures/grpc-test.proto'); +const NO_ERROR = grpc.status.UNAUTHENTICATED + 1; + +/** + * Creates a client generated via protobuf-ts that is using the {@link grpc.Client} class + * directly. + */ +function createClient() { + return new GrpcTesterClient( + new GrpcTransport({ + host: 'localhost:3333', + channelCredentials: grpc.credentials.createInsecure(), + }) + ); +} + +/** + * Loads the server from proto and starts it on port 3333. + */ +async function loadAndStartServer() { + const options = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }; + + // Reloading from proto is needed as only servers loaded after the + // instrumentation is enabled will be instrumented. + const packageDefinition = await protoLoader.load(PROTO_PATH, options); + const proto = grpc.loadPackageDefinition(packageDefinition).pkg_test; + return startServer(proto, 3333); +} + +/** + * Creates a list of test data that includes all possible cases of status codes + * returned by the server, the input for the server to provoke the status codes, + * and the expected result code that should be present on the span. + */ +function getStatusCodeTestData(): { + // Name of the key used in the test (OK, UNAUTHENTICATED, DATA_LOSS, ...) + key: string; + // Input for the server implementation that will provoke the status code from 'key' + input: number; + // The result code that should be present on the created span + expectedResultCode: number; +}[] { + const codes = Object.keys(grpc.status) + .filter(key => !isNaN(Number(grpc.status[key as any]))) + // Remove 'OK' as the test server has special behavior to provoke an 'OK' response + .filter(key => key !== 'OK') + // Create the test data + .map(key => { + return { + key: key, + input: Number(grpc.status[key as any]), + expectedResultCode: Number(grpc.status[key as any]), + }; + }); + + // Push 'OK' with special input to provoke 'OK' response from test-server + codes.push({ + key: 'OK', + input: NO_ERROR, + expectedResultCode: grpc.status.OK, + }); + + return codes; +} + +/** + * Creates tests that assert that no spans are created. + * @param statusCodeTestWithRootSpan function that creates tests that include a root span + * @param statusCodeTestNoRootSpan function that creates tests that do not include a root span + */ +function shouldNotCreateSpans( + statusCodeTestWithRootSpan: TestFunction, + statusCodeTestNoRootSpan: TestFunction +) { + describe('with root span', () => { + getStatusCodeTestData().forEach(({ key, input, expectedResultCode }) => { + statusCodeTestWithRootSpan( + input, + key, + expectedResultCode, + assertNoSpansExported + ); + }); + }); + + describe('without root span', () => { + getStatusCodeTestData().forEach(({ key, input, expectedResultCode }) => { + statusCodeTestNoRootSpan( + input, + key, + expectedResultCode, + assertNoSpansExported + ); + }); + }); +} + +describe('#grpc-protobuf', () => { + let client: GrpcTesterClient; + let server: grpc.Server; + const provider = new NodeTracerProvider(); + let contextManager: ContextManager; + provider.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + + before(() => { + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + instrumentation.setTracerProvider(provider); + }); + + beforeEach(() => { + memoryExporter.reset(); + contextManager = new AsyncHooksContextManager().enable(); + context.setGlobalContextManager(contextManager); + }); + + describe('client', async () => { + beforeEach(async () => { + instrumentation.enable(); + client = createClient(); + + server = await loadAndStartServer(); + }); + + afterEach(done => { + context.disable(); + server.tryShutdown(() => { + instrumentation.disable(); + done(); + }); + }); + + describe('makeUnaryRequest()', async () => { + async function act(status: number) { + return client.unaryMethod({ + num: status, + }); + } + + function statusCodeTestNoRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, async () => { + // Act + try { + const request = await act(input); + // Assert success results + assert.strictEqual(request.response.num, input); + } catch (e) { + // Assert failure results + assert.strictEqual(e.code, key); + } + // Assert span data + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'UnaryMethod', + expectedSpanStatus + ); + }); + } + + function statusCodeTestWithRootSpan( + input: number, + key: string, + expectedResultCode: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, async () => { + // Arrange + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + return context.with( + trace.setSpan(context.active(), span), + async () => { + const rootSpan = trace.getSpan(context.active()); + assert.ok(rootSpan != null); + assert.deepStrictEqual(rootSpan, span); + + // Act + try { + const request = await act(input); + // Assert success results + assert.strictEqual(request.response.num, input); + } catch (e) { + // Assert failure results + assert.strictEqual(e.code, key); + } + + // Assert + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'UnaryMethod', + expectedResultCode, + rootSpan + ); + } + ); + }); + } + + describe('should create root client span and server child span', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestNoRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should create child client span when parent span exists', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestWithRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should not create any spans when disabled', () => { + beforeEach(done => { + instrumentation.disable(); + server.tryShutdown(() => { + loadAndStartServer().then(loadedServer => { + server = loadedServer; + done(); + }); + }); + }); + + afterEach(() => { + instrumentation.enable(); + }); + + shouldNotCreateSpans( + statusCodeTestWithRootSpan, + statusCodeTestNoRootSpan + ); + }); + }); + + describe('makeClientStreamRequest()', () => { + async function act(input: number) { + const call = client.clientStreamMethod({ + num: input, + }); + + await call.requests.send({ num: input }); + await call.requests.send({ num: input }); + await call.requests.complete(); + + return await call.response; + } + + async function statusCodeTestNoRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, async () => { + // Act + try { + const response = await act(input); + // Assert success results + assert.strictEqual(response.num, input * 2); + } catch (e) { + // Assert failure results + assert.strictEqual(e.code, key); + } + + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'ClientStreamMethod', + expectedSpanStatus + ); + }); + } + + async function statusCodeTestWithRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, async () => { + // Arrange + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + return context.with( + trace.setSpan(context.active(), span), + async () => { + const rootSpan = trace.getSpan(context.active()); + assert.ok(rootSpan != null); + assert.deepStrictEqual(rootSpan, span); + + // Act + try { + const response = await act(input); + // Assert success results + assert.strictEqual(response.num, input * 2); + } catch (e) { + // Assert failure results + assert.strictEqual(e.code, key); + } + + // Assert + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'ClientStreamMethod', + expectedSpanStatus, + rootSpan + ); + } + ); + }); + } + + describe('should create root client span and server child span', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestNoRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should create child client span when parent span exists', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestWithRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should not create any spans when disabled', () => { + beforeEach(done => { + instrumentation.disable(); + server.tryShutdown(() => { + loadAndStartServer().then(loadedServer => { + server = loadedServer; + done(); + }); + }); + }); + + afterEach(() => { + instrumentation.enable(); + }); + + shouldNotCreateSpans( + statusCodeTestWithRootSpan, + statusCodeTestNoRootSpan + ); + }); + }); + + describe('makeServerStreamRequest()', () => { + function statusCodeTestNoRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, done => { + const serverStream = client.serverStreamMethod({ + num: input, + }); + + serverStream.responses.onMessage(message => { + assert.strictEqual(message.num, input); + }); + + function completeCallback() { + try { + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'ServerStreamMethod', + expectedSpanStatus + ); + } catch (err) { + // catch error and call done() to ensure an error message + // is shown in the test results instead of a test timeout + done(err); + return; + } + done(); + } + + serverStream.responses.onError(completeCallback); + serverStream.responses.onComplete(completeCallback); + }); + } + + function statusCodeTestWithRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, done => { + // Arrange + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + context.with(trace.setSpan(context.active(), span), async () => { + const rootSpan = trace.getSpan(context.active()); + assert.ok(rootSpan != null); + assert.deepStrictEqual(rootSpan, span); + + // Act + const serverStream = client.serverStreamMethod({ + num: input, + }); + + serverStream.responses.onMessage(message => { + assert.strictEqual(message.num, input); + }); + + function completeCallback() { + try { + // Assert + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'ServerStreamMethod', + expectedSpanStatus, + rootSpan + ); + } catch (err) { + // catch error and call done() to ensure an error message + // is shown in the test results instead of a test timeout + done(err); + return; + } + done(); + } + + serverStream.responses.onError(completeCallback); + serverStream.responses.onComplete(completeCallback); + }); + }); + } + + describe('should create root client span and server child span', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestNoRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should create child client span when parent span exists', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestWithRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should not create any spans when disabled', () => { + beforeEach(done => { + instrumentation.disable(); + server.tryShutdown(() => { + loadAndStartServer().then(loadedServer => { + server = loadedServer; + done(); + }); + }); + }); + + afterEach(() => { + instrumentation.enable(); + }); + + shouldNotCreateSpans( + statusCodeTestWithRootSpan, + statusCodeTestNoRootSpan + ); + }); + }); + + describe('makeBidiStreamRequest()', () => { + function statusCodeTestNoRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, done => { + const bidiStream = client.bidiStreamMethod(); + + bidiStream.responses.onMessage(message => { + assert.strictEqual(message.num, input); + }); + + function completeHandler() { + try { + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'BidiStreamMethod', + expectedSpanStatus + ); + } catch (err) { + // catch error and call done() to ensure an error message + // is shown in the test results instead of a test timeout + done(err); + return; + } + done(); + } + + bidiStream.responses.onError(completeHandler); + bidiStream.responses.onComplete(completeHandler); + + bidiStream.requests.send({ + num: input, + }); + bidiStream.requests.send({ + num: input, + }); + bidiStream.requests.complete(); + }); + } + + function statusCodeTestWithRootSpan( + input: number, + key: string, + expectedSpanStatus: number, + assertSpans: SpanAssertionFunction + ) { + it('with status code: ' + key, done => { + // Arrange + const span = provider + .getTracer('default') + .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); + context.with(trace.setSpan(context.active(), span), () => { + const rootSpan = trace.getSpan(context.active()); + assert.ok(rootSpan != null); + assert.deepStrictEqual(rootSpan, span); + + // Act + const bidiStream = client.bidiStreamMethod(); + + function completeHandler() { + try { + assertSpans( + memoryExporter, + 'pkg_test.GrpcTester', + 'BidiStreamMethod', + expectedSpanStatus, + rootSpan + ); + } catch (err) { + // catch error and call done() to ensure an error message + // is shown in the test results instead of a test timeout + done(err); + return; + } + done(); + } + + bidiStream.responses.onMessage(message => { + assert.strictEqual(message.num, input); + }); + + bidiStream.responses.onError(completeHandler); + bidiStream.responses.onComplete(completeHandler); + + bidiStream.requests.send({ + num: input, + }); + bidiStream.requests.send({ + num: input, + }); + bidiStream.requests.complete(); + }); + }); + } + + describe('should create root client span and server child span', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestNoRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should create child client span when parent span exists', () => { + getStatusCodeTestData().forEach( + ({ key, input, expectedResultCode }) => { + statusCodeTestWithRootSpan( + input, + key, + expectedResultCode, + assertExportedSpans + ); + } + ); + }); + + describe('should not create any spans when disabled', () => { + beforeEach(done => { + instrumentation.disable(); + server.tryShutdown(() => { + loadAndStartServer().then(loadedServer => { + server = loadedServer; + done(); + }); + }); + }); + + afterEach(() => { + instrumentation.enable(); + }); + + shouldNotCreateSpans( + statusCodeTestWithRootSpan, + statusCodeTestNoRootSpan + ); + }); + }); + }); + + describe('should not produce telemetry when ignored via config', () => { + beforeEach(async () => { + instrumentation.disable(); + instrumentation.setConfig({ + ignoreGrpcMethods: [ + 'UnaryMethod', + new RegExp(/^camel.*Method$/), + (str: string) => str === 'BidiStreamMethod', + ], + }); + instrumentation.enable(); + client = createClient(); + + server = await loadAndStartServer(); + }); + + it('when filtered by exact string', async () => { + await client.unaryMethod({ num: NO_ERROR }); + assertNoSpansExported( + memoryExporter, + 'pkg_test.GrpcTester', + 'UnaryMethod', + grpc.status.OK + ); + }); + + it('when filtered by RegExp', async () => { + await client.camelCaseMethod({ num: NO_ERROR }); + assertNoSpansExported( + memoryExporter, + 'pkg_test.GrpcTester', + 'UnaryMethod', + grpc.status.OK + ); + }); + + it('when filtered by predicate', done => { + const stream = client.bidiStreamMethod({ num: NO_ERROR }); + stream.requests.send({ + num: NO_ERROR, + }); + stream.requests.complete(); + + stream.responses.onComplete(() => { + assertNoSpansExported( + memoryExporter, + 'pkg_test.GrpcTester', + 'UnaryMethod', + grpc.status.OK + ); + done(); + }); + }); + + afterEach(done => { + instrumentation.setConfig({}); + context.disable(); + server.tryShutdown(() => { + instrumentation.disable(); + done(); + }); + }); + }); + + describe('should capture metadata when set up in config', () => { + beforeEach(async () => { + instrumentation.setConfig({ + metadataToSpanAttributes: { + client: { + requestMetadata: ['client_metadata_key'], + responseMetadata: ['server_metadata_key'], + }, + server: { + requestMetadata: ['client_metadata_key'], + responseMetadata: ['server_metadata_key'], + }, + }, + }); + instrumentation.enable(); + client = createClient(); + + server = await loadAndStartServer(); + }); + + it('should capture client metadata', async () => { + await client.unaryMethod( + { num: NO_ERROR }, + { + meta: { + client_metadata_key: 'client_metadata_value', + }, + } + ); + const spans = memoryExporter.getFinishedSpans(); + assert.equal( + spans[0].attributes['rpc.request.metadata.client_metadata_key'], + 'client_metadata_value' + ); + }); + + afterEach(done => { + instrumentation.setConfig({}); + context.disable(); + server.tryShutdown(() => { + instrumentation.disable(); + done(); + }); + }); + }); +}); diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/helper.ts b/experimental/packages/opentelemetry-instrumentation-grpc/test/helper.ts index 693cdeab463..9d861fd6512 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/test/helper.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/helper.ts @@ -34,18 +34,18 @@ import * as assert from 'assert'; import * as protoLoader from '@grpc/proto-loader'; import { status as GrpcStatus, - requestCallback, ServerUnaryCall, + requestCallback, ServerReadableStream, - ServerWritableStream, ServerDuplexStream, + ServerWritableStream, Client, Metadata, ServiceError, Server, - ServerCredentials, credentials, loadPackageDefinition, + ServerCredentials, } from '@grpc/grpc-js'; import { assertPropagation, assertSpan } from './utils/assertionUtils'; import { promisify } from 'util'; @@ -109,13 +109,149 @@ const checkEqual = ? requestEqual(x)(y) : false; +const replicate = (request: TestRequestResponse) => { + const result: TestRequestResponse[] = []; + for (let i = 0; i < request.num; i++) { + result.push(request); + } + return result; +}; + +export async function startServer(proto: any, port: number) { + const MAX_ERROR_STATUS = GrpcStatus.UNAUTHENTICATED; + const server = new Server(); + + function getError(msg: string, code: number): ServiceError | null { + const err: ServiceError = { + ...new Error(msg), + name: msg, + message: msg, + code, + details: msg, + metadata: new Metadata(), + }; + return err; + } + + server.addService(proto.GrpcTester.service, { + // An error is emitted every time + // request.num <= MAX_ERROR_STATUS = (status.UNAUTHENTICATED) + // in those cases, erro.code = request.num + + // This method returns the request + // This method returns the request + unaryMethodWithMetadata( + call: ServerUnaryCall, + callback: requestCallback + ) { + const serverMetadata: any = new Metadata(); + serverMetadata.add('server_metadata_key', 'server_metadata_value'); + + call.sendMetadata(serverMetadata); + + call.request.num <= MAX_ERROR_STATUS + ? callback( + getError( + 'Unary Method with Metadata Error', + call.request.num + ) as ServiceError + ) + : callback(null, { num: call.request.num }); + }, + + // This method returns the request + unaryMethod( + call: ServerUnaryCall, + callback: requestCallback + ) { + call.request.num <= MAX_ERROR_STATUS + ? callback( + getError('Unary Method Error', call.request.num) as ServiceError + ) + : callback(null, { num: call.request.num }); + }, + + // This method returns the request + camelCaseMethod( + call: ServerUnaryCall, + callback: requestCallback + ) { + call.request.num <= MAX_ERROR_STATUS + ? callback( + getError('Unary Method Error', call.request.num) as ServiceError + ) + : callback(null, { num: call.request.num }); + }, + + // This method sums the requests + clientStreamMethod( + call: ServerReadableStream, + callback: requestCallback + ) { + let sum = 0; + let hasError = false; + let code = GrpcStatus.OK; + call.on('data', (data: TestRequestResponse) => { + sum += data.num; + if (data.num <= MAX_ERROR_STATUS) { + hasError = true; + code = data.num; + } + }); + call.on('end', () => { + hasError + ? callback(getError('Client Stream Method Error', code) as any) + : callback(null, { num: sum }); + }); + }, + + // This method returns an array that replicates the request, request.num of + // times + serverStreamMethod: (call: ServerWritableStream) => { + const result = replicate(call.request); + + if (call.request.num <= MAX_ERROR_STATUS) { + call.emit( + 'error', + getError('Server Stream Method Error', call.request.num) + ); + } else { + result.forEach(element => { + call.write(element); + }); + } + call.end(); + }, + + // This method returns the request + bidiStreamMethod: (call: ServerDuplexStream) => { + call.on('data', (data: TestRequestResponse) => { + if (data.num <= MAX_ERROR_STATUS) { + call.emit('error', getError('Server Stream Method Error', data.num)); + } else { + call.write(data); + } + }); + call.on('end', () => { + call.end(); + }); + }, + }); + const bindAwait = promisify(server.bindAsync); + await bindAwait.call( + server, + 'localhost:' + port, + ServerCredentials.createInsecure() + ); + server.start(); + return server; +} + export const runTests = ( plugin: GrpcInstrumentation, moduleName: string, grpcPort: number ) => { - const MAX_ERROR_STATUS = GrpcStatus.UNAUTHENTICATED; - const grpcClient = { unaryMethodWithMetadata: ( client: TestGrpcClient, @@ -275,138 +411,6 @@ export const runTests = ( let server: Server; let client: Client; - const replicate = (request: TestRequestResponse) => { - const result: TestRequestResponse[] = []; - for (let i = 0; i < request.num; i++) { - result.push(request); - } - return result; - }; - - async function startServer(proto: any) { - const server = new Server(); - - function getError(msg: string, code: number): ServiceError | null { - const err: ServiceError = { - ...new Error(msg), - name: msg, - message: msg, - code, - details: msg, - metadata: new Metadata(), - }; - return err; - } - - server.addService(proto.GrpcTester.service, { - // An error is emitted every time - // request.num <= MAX_ERROR_STATUS = (status.UNAUTHENTICATED) - // in those cases, erro.code = request.num - - // This method returns the request - unaryMethodWithMetadata( - call: ServerUnaryCall, - callback: requestCallback - ) { - const serverMetadata = new Metadata(); - serverMetadata.add('server_metadata_key', 'server_metadata_value'); - - call.sendMetadata(serverMetadata); - - call.request.num <= MAX_ERROR_STATUS - ? callback( - getError('Unary Method with Metadata Error', call.request.num) - ) - : callback(null, { num: call.request.num }); - }, - - // This method returns the request - unaryMethod( - call: ServerUnaryCall, - callback: requestCallback - ) { - call.request.num <= MAX_ERROR_STATUS - ? callback(getError('Unary Method Error', call.request.num)) - : callback(null, { num: call.request.num }); - }, - - // This method returns the request - camelCaseMethod( - call: ServerUnaryCall, - callback: requestCallback - ) { - call.request.num <= MAX_ERROR_STATUS - ? callback(getError('Unary Method Error', call.request.num)) - : callback(null, { num: call.request.num }); - }, - - // This method sums the requests - clientStreamMethod( - call: ServerReadableStream, - callback: requestCallback - ) { - let sum = 0; - let hasError = false; - let code = GrpcStatus.OK; - call.on('data', (data: TestRequestResponse) => { - sum += data.num; - if (data.num <= MAX_ERROR_STATUS) { - hasError = true; - code = data.num; - } - }); - call.on('end', () => { - hasError - ? callback(getError('Client Stream Method Error', code) as any) - : callback(null, { num: sum }); - }); - }, - - // This method returns an array that replicates the request, request.num of - // times - serverStreamMethod: (call: ServerWritableStream) => { - const result = replicate(call.request); - - if (call.request.num <= MAX_ERROR_STATUS) { - call.emit( - 'error', - getError('Server Stream Method Error', call.request.num) - ); - } else { - result.forEach(element => { - call.write(element); - }); - } - call.end(); - }, - - // This method returns the request - bidiStreamMethod: (call: ServerDuplexStream) => { - call.on('data', (data: TestRequestResponse) => { - if (data.num <= MAX_ERROR_STATUS) { - call.emit( - 'error', - getError('Server Stream Method Error', data.num) - ); - } else { - call.write(data); - } - }); - call.on('end', () => { - call.end(); - }); - }, - }); - const bindAwait = promisify(server.bindAsync); - await bindAwait.call( - server, - 'localhost:' + grpcPort, - ServerCredentials.createInsecure() - ); - server.start(); - return server; - } - function createClient(proto: any) { return new proto.GrpcTester( 'localhost:' + grpcPort, @@ -590,9 +594,7 @@ export const runTests = ( .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); return context.with(trace.setSpan(context.active(), span), async () => { const rootSpan = trace.getSpan(context.active()); - if (!rootSpan) { - return assert.ok(false); - } + assert.ok(rootSpan != null); assert.deepStrictEqual(rootSpan, span); const args = [client, method.request, method.metadata]; @@ -706,9 +708,7 @@ export const runTests = ( .startSpan('TestSpan', { kind: SpanKind.PRODUCER }); return context.with(trace.setSpan(context.active(), span), async () => { const rootSpan = trace.getSpan(context.active()); - if (!rootSpan) { - return assert.ok(false); - } + assert.ok(rootSpan != null); assert.deepStrictEqual(rootSpan, span); const args = [client, insertError(method.request)(errorCode)]; @@ -768,7 +768,7 @@ export const runTests = ( const packageDefinition = await protoLoader.load(PROTO_PATH, options); const proto = loadPackageDefinition(packageDefinition).pkg_test; - server = await startServer(proto); + server = await startServer(proto, grpcPort); client = createClient(proto); }); @@ -811,7 +811,7 @@ export const runTests = ( const packageDefinition = await protoLoader.load(PROTO_PATH, options); const proto = loadPackageDefinition(packageDefinition).pkg_test; - server = await startServer(proto); + server = await startServer(proto, grpcPort); client = createClient(proto); }); @@ -845,7 +845,7 @@ export const runTests = ( const packageDefinition = await protoLoader.load(PROTO_PATH, options); const proto = loadPackageDefinition(packageDefinition).pkg_test; - server = await startServer(proto); + server = await startServer(proto, grpcPort); client = createClient(proto); }); @@ -889,7 +889,7 @@ export const runTests = ( const packageDefinition = await protoLoader.load(PROTO_PATH, options); const proto = loadPackageDefinition(packageDefinition).pkg_test; - server = await startServer(proto); + server = await startServer(proto, grpcPort); client = createClient(proto); }); @@ -976,7 +976,7 @@ export const runTests = ( const packageDefinition = await protoLoader.load(PROTO_PATH, options); const proto = loadPackageDefinition(packageDefinition).pkg_test; - server = await startServer(proto); + server = await startServer(proto, grpcPort); client = createClient(proto); }); diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/protobuf-ts-utils.ts b/experimental/packages/opentelemetry-instrumentation-grpc/test/protobuf-ts-utils.ts new file mode 100644 index 00000000000..2fc2a3d1bec --- /dev/null +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/protobuf-ts-utils.ts @@ -0,0 +1,111 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { Span, SpanKind } from '@opentelemetry/api'; +import * as assert from 'assert'; + +import { + InMemorySpanExporter, + ReadableSpan, +} from '@opentelemetry/sdk-trace-base'; +import { assertPropagation, assertSpan } from './utils/assertionUtils'; +import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; + +export type SpanAssertionFunction = ( + exporter: InMemorySpanExporter, + rpcService: string, + rpcMethod: string, + expectedSpanStatus: number, + rootSpan?: Span +) => void; + +export type TestFunction = ( + input: number, + errorKey: string, + expectedResultCode: number, + assertSpans: SpanAssertionFunction +) => void; + +function validateSpans( + clientSpan: ReadableSpan, + serverSpan: ReadableSpan, + rpcService: string, + rpcMethod: string, + status: number +) { + const validations = { + name: `grpc.${rpcService}/${rpcMethod}`, + netPeerName: 'localhost', + status: status, + netPeerPort: 3333, + }; + + assert.strictEqual( + clientSpan.spanContext().traceId, + serverSpan.spanContext().traceId + ); + assertPropagation(serverSpan, clientSpan); + + assertSpan('grpc', serverSpan, SpanKind.SERVER, validations); + assertSpan('grpc', clientSpan, SpanKind.CLIENT, validations); + assert.strictEqual( + clientSpan.attributes[SemanticAttributes.RPC_METHOD], + rpcMethod + ); + assert.strictEqual( + clientSpan.attributes[SemanticAttributes.RPC_SERVICE], + rpcService + ); +} + +export function assertNoSpansExported( + exporter: InMemorySpanExporter, + _rpcService: string, + _rpcMethod: string, + _expectedSpanStatus: number, + _rootSpan?: Span +) { + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 0); +} + +export function assertExportedSpans( + exporter: InMemorySpanExporter, + rpcService: string, + rpcMethod: string, + expectedSpanStatus: number, + rootSpan?: Span +) { + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2); + const serverSpan = spans[0]; + const clientSpan = spans[1]; + + validateSpans( + clientSpan, + serverSpan, + rpcService, + rpcMethod, + expectedSpanStatus + ); + + if (rootSpan) { + assert.strictEqual( + rootSpan?.spanContext().traceId, + serverSpan.spanContext().traceId + ); + assert.strictEqual(rootSpan?.spanContext().spanId, clientSpan.parentSpanId); + } +}