Skip to content

Commit

Permalink
fix(instrumentation-grpc): instrument @grpc/grpc-js Client methods (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc authored Aug 1, 2023
1 parent 1a7488e commit 87fff2e
Show file tree
Hide file tree
Showing 13 changed files with 1,440 additions and 266 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ All notable changes to experimental packages in this project will be documented

* fix(sdk-node): use resource interface instead of concrete class [#3803](https://github.com/open-telemetry/opentelemetry-js/pull/3803) @blumamir
* fix(sdk-logs): remove includeTraceContext configuration and use LogRecord context when available [#3817](https://github.com/open-telemetry/opentelemetry-js/pull/3817) @hectorhdzg
* fix(instrumentation-grpc): instrument @grpc/grpc-js Client methods [#3804](https://github.com/open-telemetry/opentelemetry-js/pull/3804) @pichlermarc

## 0.39.1

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
build
test/proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"prepublishOnly": "npm run compile",
"compile": "tsc --build",
"clean": "tsc --build --clean",
"test": "nyc ts-mocha -p tsconfig.json test/**/*.test.ts",
"test": "npm run protos:generate && nyc ts-mocha -p tsconfig.json test/**/*.test.ts",
"tdd": "npm run test -- --watch-extensions ts --watch",
"lint": "eslint . --ext .ts",
"lint:fix": "eslint . --ext .ts --fix",
Expand All @@ -18,7 +18,8 @@
"watch": "tsc --build --watch",
"precompile": "cross-var lerna run version --scope $npm_package_name --include-dependencies",
"prewatch": "node ../../../scripts/version-update.js",
"peer-api-check": "node ../../../scripts/peer-api-check.js"
"peer-api-check": "node ../../../scripts/peer-api-check.js",
"protos:generate": "cd test/fixtures && buf generate"
},
"keywords": [
"opentelemetry",
Expand All @@ -45,6 +46,10 @@
"access": "public"
},
"devDependencies": {
"@bufbuild/buf": "1.21.0-1",
"@protobuf-ts/grpc-transport": "2.9.0",
"@protobuf-ts/runtime-rpc": "2.9.0",
"@protobuf-ts/runtime": "2.9.0",
"@grpc/grpc-js": "^1.7.1",
"@grpc/proto-loader": "^0.7.3",
"@opentelemetry/api": "1.4.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import type { GrpcJsInstrumentation } from './';
import type { GrpcClientFunc, SendUnaryDataCallback } from './types';
import type { metadataCaptureType } from '../internal-types';

import { SpanStatusCode, propagation, context } from '@opentelemetry/api';
import { propagation, context } from '@opentelemetry/api';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
import { CALL_SPAN_ENDED } from './serverUtils';
import { AttributeNames } from '../enums/AttributeNames';
import { GRPC_STATUS_CODE_OK } from '../status-code';
import {
_grpcStatusCodeToSpanStatus,
_grpcStatusCodeToOpenTelemetryStatusCode,
_methodIsIgnored,
} from '../utils';
import { errorMonitor } from 'events';

/**
* Parse a package method list and return a list of methods to patch
Expand Down Expand Up @@ -63,6 +63,91 @@ export function getMethodsToWrap(
return methodList;
}

/**
* Patches a callback so that the current span for this trace is also ended
* when the callback is invoked.
*/
export function patchedCallback(
span: Span,
callback: SendUnaryDataCallback<ResponseType>
) {
const wrappedFn: SendUnaryDataCallback<ResponseType> = (
err: grpcJs.ServiceError | null,
res?: ResponseType
) => {
if (err) {
if (err.code) {
span.setStatus(_grpcStatusCodeToSpanStatus(err.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code);
}
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
});
} else {
span.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE,
GRPC_STATUS_CODE_OK
);
}

span.end();
callback(err, res);
};
return context.bind(context.active(), wrappedFn);
}

export function patchResponseMetadataEvent(
span: Span,
call: EventEmitter,
metadataCapture: metadataCaptureType
) {
call.on('metadata', (responseMetadata: any) => {
metadataCapture.client.captureResponseMetadata(span, responseMetadata);
});
}

export function patchResponseStreamEvents(span: Span, call: EventEmitter) {
// Both error and status events can be emitted
// the first one emitted set spanEnded to true
let spanEnded = false;
const endSpan = () => {
if (!spanEnded) {
span.end();
spanEnded = true;
}
};
context.bind(context.active(), call);
call.on(errorMonitor, (err: ServiceError) => {
if (spanEnded) {
return;
}

span.setStatus({
code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code),
message: err.message,
});
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
[SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code,
});

endSpan();
});

call.on('status', (status: SpanStatus) => {
if (spanEnded) {
return;
}

span.setStatus(_grpcStatusCodeToSpanStatus(status.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code);

endSpan();
});
}

/**
* Execute grpc client call. Apply completitionspan properties and end the
* span on callback or receiving an emitted event.
Expand All @@ -71,44 +156,9 @@ export function makeGrpcClientRemoteCall(
metadataCapture: metadataCaptureType,
original: GrpcClientFunc,
args: unknown[],
metadata: Metadata,
self: Client
metadata: grpcJs.Metadata,
self: grpcJs.Client
): (span: Span) => EventEmitter {
/**
* Patches a callback so that the current span for this trace is also ended
* when the callback is invoked.
*/
function patchedCallback(
span: Span,
callback: SendUnaryDataCallback<ResponseType>
) {
const wrappedFn: SendUnaryDataCallback<ResponseType> = (
err: ServiceError | null,
res?: ResponseType
) => {
if (err) {
if (err.code) {
span.setStatus(_grpcStatusCodeToSpanStatus(err.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, err.code);
}
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
});
} else {
span.setStatus({ code: SpanStatusCode.UNSET });
span.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE,
GRPC_STATUS_CODE_OK
);
}

span.end();
callback(err, res);
};
return context.bind(context.active(), wrappedFn);
}

return (span: Span) => {
// if unary or clientStream
if (!original.responseStream) {
Expand All @@ -132,90 +182,64 @@ export function makeGrpcClientRemoteCall(

// if server stream or bidi
if (original.responseStream) {
// Both error and status events can be emitted
// the first one emitted set spanEnded to true
let spanEnded = false;
const endSpan = () => {
if (!spanEnded) {
span.end();
spanEnded = true;
}
};
context.bind(context.active(), call);
call.on('error', (err: ServiceError) => {
if (call[CALL_SPAN_ENDED]) {
return;
}
call[CALL_SPAN_ENDED] = true;

span.setStatus({
code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code),
message: err.message,
});
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
[AttributeNames.GRPC_ERROR_MESSAGE]: err.message,
[SemanticAttributes.RPC_GRPC_STATUS_CODE]: err.code,
});

endSpan();
});

call.on('status', (status: SpanStatus) => {
if (call[CALL_SPAN_ENDED]) {
return;
}
call[CALL_SPAN_ENDED] = true;

span.setStatus(_grpcStatusCodeToSpanStatus(status.code));
span.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE, status.code);

endSpan();
});
patchResponseStreamEvents(span, call);
}
return call;
};
}

/**
* Returns the metadata argument from user provided arguments (`args`)
*/
export function getMetadata(
this: GrpcJsInstrumentation,
original: GrpcClientFunc,
grpcClient: typeof grpcJs,
args: Array<unknown | Metadata>
): Metadata {
let metadata: Metadata;

export function getMetadataIndex(args: Array<unknown | Metadata>): number {
// This finds an instance of Metadata among the arguments.
// A possible issue that could occur is if the 'options' parameter from
// the user contains an '_internal_repr' as well as a 'getMap' function,
// but this is an extremely rare case.
let metadataIndex = args.findIndex((arg: unknown | Metadata) => {
return args.findIndex((arg: unknown | Metadata) => {
return (
arg &&
typeof arg === 'object' &&
(arg as Metadata)['internalRepr'] && // changed from _internal_repr in grpc --> @grpc/grpc-js https://github.com/grpc/grpc-node/blob/95289edcaf36979cccf12797cc27335da8d01f03/packages/grpc-js/src/metadata.ts#L88
typeof (arg as Metadata).getMap === 'function'
);
});
}

/**
* Returns the metadata argument from user provided arguments (`args`)
* If no metadata is provided in `args`: adds empty metadata to `args` and returns that empty metadata
*/
export function extractMetadataOrSplice(
grpcLib: typeof grpcJs,
args: Array<unknown | grpcJs.Metadata>,
spliceIndex: number
) {
let metadata: grpcJs.Metadata;
const metadataIndex = getMetadataIndex(args);
if (metadataIndex === -1) {
metadata = new grpcClient.Metadata();
if (!original.requestStream) {
// unary or server stream
metadataIndex = 1;
} else {
// client stream or bidi
metadataIndex = 0;
}
args.splice(metadataIndex, 0, metadata);
// Create metadata if it does not exist
metadata = new grpcLib.Metadata();
args.splice(spliceIndex, 0, metadata);
} else {
metadata = args[metadataIndex] as Metadata;
}
return metadata;
}

/**
* Returns the metadata argument from user provided arguments (`args`)
* Adds empty metadata to arguments if the default is used.
*/
export function extractMetadataOrSpliceDefault(
grpcClient: typeof grpcJs,
original: GrpcClientFunc,
args: Array<unknown | grpcJs.Metadata>
): grpcJs.Metadata {
return extractMetadataOrSplice(
grpcClient,
args,
original.requestStream ? 0 : 1
);
}

/**
* Inject opentelemetry trace context into `metadata` for use by another
* grpc receiver
Expand Down
Loading

0 comments on commit 87fff2e

Please sign in to comment.