Skip to content

Commit

Permalink
fix(instrumentation-grpc): splice metadata if it does not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc committed Jun 26, 2023
1 parent a20258b commit 6f3af1b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,31 @@
* limitations under the License.
*/

import {GrpcJsInstrumentation} from './';
import type {GrpcClientFunc, GrpcEmitter, SendUnaryDataCallback,} from './types';
import {context, propagation, Span, SpanStatus, SpanStatusCode,} from '@opentelemetry/api';
import { GrpcJsInstrumentation } from './';
import type {
GrpcClientFunc,
GrpcEmitter,
SendUnaryDataCallback,
} from './types';
import {
context,
propagation,
Span,
SpanStatus,
SpanStatusCode,
} from '@opentelemetry/api';
import type * as grpcJs from '@grpc/grpc-js';
import {_grpcStatusCodeToOpenTelemetryStatusCode, _grpcStatusCodeToSpanStatus, _methodIsIgnored,} from '../utils';
import {CALL_SPAN_ENDED} from './serverUtils';
import {EventEmitter} from 'events';
import {AttributeNames} from '../enums/AttributeNames';
import {SemanticAttributes} from '@opentelemetry/semantic-conventions';
import {metadataCaptureType} from '../internal-types';
import {GRPC_STATUS_CODE_OK} from '../status-code';
import {
_grpcStatusCodeToOpenTelemetryStatusCode,
_grpcStatusCodeToSpanStatus,
_methodIsIgnored,
} from '../utils';
import { CALL_SPAN_ENDED } from './serverUtils';
import { EventEmitter } from 'events';
import { AttributeNames } from '../enums/AttributeNames';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
import { metadataCaptureType } from '../internal-types';
import { GRPC_STATUS_CODE_OK } from '../status-code';

/**
* Parse a package method list and return a list of methods to patch
Expand Down Expand Up @@ -96,7 +110,7 @@ export function patchResponseMetadataEvent(
call: GrpcEmitter,
metadataCapture: metadataCaptureType
) {
call.on('metadata', responseMetadata => {
call.on('metadata', (responseMetadata: any) => {
metadataCapture.client.captureResponseMetadata(span, responseMetadata);
});
}
Expand Down Expand Up @@ -201,42 +215,43 @@ export function getMetadataIndex(
});
}

export function getMetadata(
args: Array<unknown | grpcJs.Metadata>
): grpcJs.Metadata | undefined {
const index = getMetadataIndex(args);
if (index === -1) {
return undefined;
/**
* Returns the metadata argument from user provided arguments (`args`)
* If no metadata is provided in `args`: adds empty metadata to `args` and returns that empty metadata
*/
export function extractMetadataOrSplice(
grpcLib: typeof grpcJs,
args: Array<unknown | grpcJs.Metadata>,
spliceIndex: number
) {
let metadata: grpcJs.Metadata;
const metadataIndex = getMetadataIndex(args);
if (metadataIndex === -1) {
// Create metadata if it does not exist
metadata = new grpcLib.Metadata();
args.splice(spliceIndex, 0, metadata);
} else {
metadata = args[metadataIndex] as grpcJs.Metadata;
}
return args[index] as grpcJs.Metadata;
return metadata;
}

/**
* Returns the metadata argument from user provided arguments (`args`)
* Adds empty metadata to arguments if the default is used.
*/
export function extractMetadataOrSpliceDefault(
this: GrpcJsInstrumentation,
grpcClient: typeof grpcJs,
original: GrpcClientFunc,
args: Array<unknown | grpcJs.Metadata>
): grpcJs.Metadata {
let metadata: grpcJs.Metadata;
let metadataIndex = getMetadataIndex(args);
if (metadataIndex === -1) {
metadata = new grpcClient.Metadata();
if (!original.requestStream) {
// unary or server stream
metadataIndex = 1;
} else {
// client stream or bidi
metadataIndex = 0;
}
args.splice(metadataIndex, 0, metadata);
if (!original.requestStream) {
// unary or server stream
return extractMetadataOrSplice(grpcClient, args, 1);
} else {
metadata = args[metadataIndex] as grpcJs.Metadata;
// client stream or bidi
return extractMetadataOrSplice(grpcClient, args, 0);
}
return metadata;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import {
setSpanContext,
patchedCallback,
patchResponseStreamEvents,
patchResponseMetadataEvent, getMetadata,
patchResponseMetadataEvent, extractMetadataOrSplice
} from './clientUtils';
import { EventEmitter } from 'events';
import {
Expand Down Expand Up @@ -139,12 +139,12 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
this._wrap(
moduleExports.Client.prototype,
'makeUnaryRequest',
this._patchMakeUnaryRequest() as any
this._patchMakeUnaryRequest(moduleExports) as any
);
this._wrap(
moduleExports.Client.prototype,
'makeClientStreamRequest',
this._patchMakeClientStreamRequest() as any
this._patchMakeClientStreamRequest(moduleExports) as any
);
this._wrap(
moduleExports.Client.prototype,
Expand Down Expand Up @@ -313,16 +313,17 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
* Patch for grpc.Client.makeUnaryRequest(...) function. Provides auto-instrumentation for
* unary requests when using a Client without makeGenericClientConstructor/makeClientConstructor
*/
private _patchMakeUnaryRequest(): (
originalMakeUnaryRequest: UnaryRequestFunction
) => UnaryRequestFunction {
private _patchMakeUnaryRequest(
grpcLib: typeof grpcJs
): (originalMakeUnaryRequest: UnaryRequestFunction) => UnaryRequestFunction {
const instrumentation = this;
return (originalMakeUnaryRequest: UnaryRequestFunction) => {
return (original: UnaryRequestFunction) => {
instrumentation._diag.debug('patched makeUnaryRequest on grpc client');

return function makeUnaryRequest(
this: grpcJs.Client
): grpcJs.ClientUnaryCall {
// method must always be at first position
const method = arguments[0];
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);
Expand All @@ -334,10 +335,12 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
instrumentation.getConfig().ignoreGrpcMethods
)
) {
return originalMakeUnaryRequest.apply(this, [...arguments]);
return original.apply(this, [...arguments]);
}

const metadata = getMetadata([...arguments]);
const modifiedArgs = [...arguments];
const metadata = extractMetadataOrSplice(grpcLib, modifiedArgs, 4);

const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
Expand All @@ -347,15 +350,18 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
instrumentation.extractNetMetadata(this, span);

return context.with(trace.setSpan(context.active(), span), () => {
if (metadata) {
// TODO: no span context added when metadata does not exist.
setSpanContext(metadata);
setSpanContext(metadata);

// Replace the callback with the patched one if it is there.
// If the callback arg is not a function on the last position then the client will throw
// and never call the callback -> so there's nothing to patch
const lastArgIndex = modifiedArgs.length - 1;
const callback = modifiedArgs[lastArgIndex];
if (typeof callback === 'function') {
modifiedArgs[lastArgIndex] = patchedCallback(span, callback);
}
const callback = arguments[6];
const modifiedArgs = [...arguments];
modifiedArgs[6] = patchedCallback(span, callback);

const call = originalMakeUnaryRequest.apply(this, [...modifiedArgs]);
const call = original.apply(this, [...modifiedArgs]);
patchResponseMetadataEvent(
span,
call,
Expand All @@ -371,24 +377,22 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
* Patch for grpc.Client.makeClientStreamRequest(...) function. Provides auto-instrumentation for
* client stream requests when using a Client without makeGenericClientConstructor/makeClientConstructor
*/
private _patchMakeClientStreamRequest(): (
private _patchMakeClientStreamRequest(
grpcLib: typeof grpcJs
): (
originalMakeClientStreamRequest: MakeClientStreamFunction
) => MakeClientStreamFunction {
const instrumentation = this;
return (originalMakeClientStreamRequest: MakeClientStreamFunction) => {
return (original: MakeClientStreamFunction) => {
instrumentation._diag.debug(
'patched makeClientStreamRequest on grpc client'
);

return function makeClientStreamRequest(
this: grpcJs.Client,
method: string,
serialize: (value: any) => Buffer,
deserialize: (value: Buffer) => any,
metadata: grpcJs.Metadata,
options: grpcJs.CallOptions,
callback: grpcJs.requestCallback<any>
this: grpcJs.Client
): grpcJs.ClientWritableStream<any> {
// method must always be at first position
const method = arguments[0];
const { name, service, methodAttributeValue } =
instrumentation._splitMethodString(method);

Expand All @@ -399,17 +403,12 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
instrumentation.getConfig().ignoreGrpcMethods
)
) {
return originalMakeClientStreamRequest.call(
this,
method,
serialize,
deserialize,
metadata,
options,
callback
);
return original.apply(this, [...arguments]);
}

const modifiedArgs = [...arguments];
const metadata = extractMetadataOrSplice(grpcLib, modifiedArgs, 4);

const span = instrumentation.createClientSpan(
name,
methodAttributeValue,
Expand All @@ -421,15 +420,16 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
return context.with(trace.setSpan(context.active(), span), () => {
setSpanContext(metadata);

const call = originalMakeClientStreamRequest.call(
this,
method,
serialize,
deserialize,
metadata,
options,
patchedCallback(span, callback)
);
// Replace the callback with the patched one if it is there.
// If the callback arg is not a function on the last position then the client will throw
// and never call the callback -> so there's nothing to patch
const lastArgIndex = modifiedArgs.length - 1;
const callback = modifiedArgs[lastArgIndex];
if (typeof callback === 'function') {
modifiedArgs[lastArgIndex] = patchedCallback(span, callback);
}

const call = original.apply(this, [...modifiedArgs]);
patchResponseMetadataEvent(
span,
call,
Expand Down Expand Up @@ -714,7 +714,7 @@ export class GrpcJsInstrumentation extends InstrumentationBase {
[SemanticAttributes.RPC_SERVICE]: service,
});

if(metadata != null) {
if (metadata != null) {
this._metadataCapture.client.captureRequestMetadata(span, metadata);
}
return span;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ export type UnaryRequestFunction = (
) => grpcJs.ClientUnaryCall;

export type MakeClientStreamFunction = (
this: grpcJs.Client,
method: string,
serialize: (value: any) => Buffer,
deserialize: (value: Buffer) => any,
metadata: grpcJs.Metadata,
options: grpcJs.CallOptions,
callback: grpcJs.requestCallback<any>
...args: unknown[]
) => grpcJs.ClientWritableStream<any>;

export type MakeServerStreamRequestFunction = (
Expand Down

0 comments on commit 6f3af1b

Please sign in to comment.