Skip to content

Commit

Permalink
chore: consistent and typed use of instrumentation config (#2289)
Browse files Browse the repository at this point in the history
* refactor(amqplib): use generic instrumentation config type

* chore: consistent and typed use of instrumentation config

* fix(amqplib): read config on each invocation of interval

* fix(graphql): ts error

* fix(undici): tests

* chore: lint fix

* fix(graphql): remove uneeded non-null assertion operator

* fix(graphql): move public type to right file

* chore: lint fix

---------

Co-authored-by: Trent Mick <trentm@gmail.com>
  • Loading branch information
blumamir and trentm authored Jul 23, 2024
1 parent 8bcb561 commit 6dfe93c
Show file tree
Hide file tree
Showing 37 changed files with 282 additions and 397 deletions.
47 changes: 23 additions & 24 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,13 @@ import { PACKAGE_NAME, PACKAGE_VERSION } from './version';

const supportedVersions = ['>=0.5.5 <1'];

export class AmqplibInstrumentation extends InstrumentationBase {
protected override _config!: AmqplibInstrumentationConfig;

export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumentationConfig> {
constructor(config: AmqplibInstrumentationConfig = {}) {
super(
PACKAGE_NAME,
PACKAGE_VERSION,
Object.assign({}, DEFAULT_CONFIG, config)
);
super(PACKAGE_NAME, PACKAGE_VERSION, { ...DEFAULT_CONFIG, ...config });
}

override setConfig(config: AmqplibInstrumentationConfig = {}) {
this._config = Object.assign({}, DEFAULT_CONFIG, config);
super.setConfig({ ...DEFAULT_CONFIG, ...config });
}

protected init() {
Expand Down Expand Up @@ -394,10 +388,11 @@ export class AmqplibInstrumentation extends InstrumentationBase {
if (
!Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED)
) {
if (self._config.consumeTimeoutMs) {
const { consumeTimeoutMs } = self.getConfig();
if (consumeTimeoutMs) {
const timer = setInterval(() => {
self.checkConsumeTimeoutOnChannel(channel);
}, self._config.consumeTimeoutMs);
}, consumeTimeoutMs);
timer.unref();
channel[CHANNEL_CONSUME_TIMEOUT_TIMER] = timer;
}
Expand Down Expand Up @@ -455,9 +450,10 @@ export class AmqplibInstrumentation extends InstrumentationBase {
parentContext
);

if (self._config.consumeHook) {
const { consumeHook } = self.getConfig();
if (consumeHook) {
safeExecuteInTheMiddle(
() => self._config.consumeHook!(span, { moduleVersion, msg }),
() => consumeHook(span, { moduleVersion, msg }),
e => {
if (e) {
diag.error('amqplib instrumentation: consumerHook error', e);
Expand Down Expand Up @@ -516,10 +512,11 @@ export class AmqplibInstrumentation extends InstrumentationBase {
options
);

if (self._config.publishHook) {
const { publishHook } = self.getConfig();
if (publishHook) {
safeExecuteInTheMiddle(
() =>
self._config.publishHook!(span, {
publishHook(span, {
moduleVersion,
exchange,
routingKey,
Expand All @@ -544,10 +541,11 @@ export class AmqplibInstrumentation extends InstrumentationBase {
try {
callback?.call(this, err, ok);
} finally {
if (self._config.publishConfirmHook) {
const { publishConfirmHook } = self.getConfig();
if (publishConfirmHook) {
safeExecuteInTheMiddle(
() =>
self._config.publishConfirmHook!(span, {
publishConfirmHook(span, {
moduleVersion,
exchange,
routingKey,
Expand Down Expand Up @@ -616,10 +614,11 @@ export class AmqplibInstrumentation extends InstrumentationBase {
options
);

if (self._config.publishHook) {
const { publishHook } = self.getConfig();
if (publishHook) {
safeExecuteInTheMiddle(
() =>
self._config.publishHook!(span, {
publishHook(span, {
moduleVersion,
exchange,
routingKey,
Expand Down Expand Up @@ -731,10 +730,11 @@ export class AmqplibInstrumentation extends InstrumentationBase {
rejected: boolean | null,
endOperation: EndOperation
) {
if (!this._config.consumeEndHook) return;
const { consumeEndHook } = this.getConfig();
if (!consumeEndHook) return;

safeExecuteInTheMiddle(
() => this._config.consumeEndHook!(span, { msg, rejected, endOperation }),
() => consumeEndHook(span, { msg, rejected, endOperation }),
e => {
if (e) {
diag.error('amqplib instrumentation: consumerEndHook error', e);
Expand All @@ -748,15 +748,14 @@ export class AmqplibInstrumentation extends InstrumentationBase {
const currentTime = hrTime();
const spansNotEnded = channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
let i: number;
const { consumeTimeoutMs } = this.getConfig();
for (i = 0; i < spansNotEnded.length; i++) {
const currMessage = spansNotEnded[i];
const timeFromConsume = hrTimeDuration(
currMessage.timeOfConsume,
currentTime
);
if (
hrTimeToMilliseconds(timeFromConsume) < this._config.consumeTimeoutMs!
) {
if (hrTimeToMilliseconds(timeFromConsume) < consumeTimeoutMs!) {
break;
}
this.endConsumerSpan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Step = (typeof steps)[number];

const supportedVersions = ['>=8.0.0 <11'];

export class CucumberInstrumentation extends InstrumentationBase {
export class CucumberInstrumentation extends InstrumentationBase<CucumberInstrumentationConfig> {
private module: Cucumber | undefined;

constructor(config: CucumberInstrumentationConfig = {}) {
Expand Down
10 changes: 1 addition & 9 deletions plugins/node/instrumentation-dataloader/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DataloaderInternal = typeof Dataloader.prototype & {
type LoadFn = (typeof Dataloader.prototype)['load'];
type LoadManyFn = (typeof Dataloader.prototype)['loadMany'];

export class DataloaderInstrumentation extends InstrumentationBase {
export class DataloaderInstrumentation extends InstrumentationBase<DataloaderInstrumentationConfig> {
constructor(config: DataloaderInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, config);
}
Expand Down Expand Up @@ -72,14 +72,6 @@ export class DataloaderInstrumentation extends InstrumentationBase {
];
}

override getConfig(): DataloaderInstrumentationConfig {
return this._config;
}

override setConfig(config: DataloaderInstrumentationConfig = {}) {
this._config = config;
}

private shouldCreateSpans(): boolean {
const config = this.getConfig();
const hasParentSpan = trace.getSpan(context.active()) !== undefined;
Expand Down
8 changes: 4 additions & 4 deletions plugins/node/instrumentation-fs/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function patchedFunctionWithOriginalProperties<
return Object.assign(patchedFunction, original);
}

export class FsInstrumentation extends InstrumentationBase {
export class FsInstrumentation extends InstrumentationBase<FsInstrumentationConfig> {
constructor(config: FsInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, config);
}
Expand Down Expand Up @@ -438,7 +438,7 @@ export class FsInstrumentation extends InstrumentationBase {
protected _runCreateHook(
...args: Parameters<CreateHook>
): ReturnType<CreateHook> {
const { createHook } = this.getConfig() as FsInstrumentationConfig;
const { createHook } = this.getConfig();
if (typeof createHook === 'function') {
try {
return createHook(...args);
Expand All @@ -450,7 +450,7 @@ export class FsInstrumentation extends InstrumentationBase {
}

protected _runEndHook(...args: Parameters<EndHook>): ReturnType<EndHook> {
const { endHook } = this.getConfig() as FsInstrumentationConfig;
const { endHook } = this.getConfig();
if (typeof endHook === 'function') {
try {
endHook(...args);
Expand All @@ -467,7 +467,7 @@ export class FsInstrumentation extends InstrumentationBase {
return false;
}

const { requireParentSpan } = this.getConfig() as FsInstrumentationConfig;
const { requireParentSpan } = this.getConfig();
if (requireParentSpan) {
const parentSpan = api.trace.getSpan(context);
if (parentSpan == null) {
Expand Down
14 changes: 7 additions & 7 deletions plugins/node/instrumentation-kafkajs/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ import {
isWrapped,
} from '@opentelemetry/instrumentation';

export class KafkaJsInstrumentation extends InstrumentationBase {
protected override _config!: KafkaJsInstrumentationConfig;

export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumentationConfig> {
constructor(config: KafkaJsInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, config);
}
Expand Down Expand Up @@ -367,9 +365,10 @@ export class KafkaJsInstrumentation extends InstrumentationBase {
context
);

if (this._config?.consumerHook && message) {
const { consumerHook } = this.getConfig();
if (consumerHook && message) {
safeExecuteInTheMiddle(
() => this._config.consumerHook!(span, { topic, message }),
() => consumerHook(span, { topic, message }),
e => {
if (e) this._diag.error('consumerHook error', e);
},
Expand All @@ -392,9 +391,10 @@ export class KafkaJsInstrumentation extends InstrumentationBase {
message.headers = message.headers ?? {};
propagation.inject(trace.setSpan(context.active(), span), message.headers);

if (this._config?.producerHook) {
const { producerHook } = this.getConfig();
if (producerHook) {
safeExecuteInTheMiddle(
() => this._config.producerHook!(span, { topic, message }),
() => producerHook(span, { topic, message }),
e => {
if (e) this._diag.error('producerHook error', e);
},
Expand Down
56 changes: 28 additions & 28 deletions plugins/node/instrumentation-mongoose/src/mongoose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,11 @@ const contextCaptureFunctions = [
// calls. this bypass the unlinked spans issue on thenables await operations.
export const _STORED_PARENT_SPAN: unique symbol = Symbol('stored-parent-span');

export class MongooseInstrumentation extends InstrumentationBase {
protected override _config!: MongooseInstrumentationConfig;

export class MongooseInstrumentation extends InstrumentationBase<MongooseInstrumentationConfig> {
constructor(config: MongooseInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, config);
}

override setConfig(config: MongooseInstrumentationConfig = {}) {
this._config = Object.assign({}, config);
}

protected init(): InstrumentationModuleDefinition {
const module = new InstrumentationNodeModuleDefinition(
'mongoose',
Expand Down Expand Up @@ -140,20 +134,23 @@ export class MongooseInstrumentation extends InstrumentationBase {
return (originalAggregate: Function) => {
return function exec(this: any, callback?: Function) {
if (
self._config.requireParentSpan &&
self.getConfig().requireParentSpan &&
trace.getSpan(context.active()) === undefined
) {
return originalAggregate.apply(this, arguments);
}

const parentSpan = this[_STORED_PARENT_SPAN];
const attributes: Attributes = {};
if (self._config.dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] =
self._config.dbStatementSerializer('aggregate', {
const { dbStatementSerializer } = self.getConfig();
if (dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] = dbStatementSerializer(
'aggregate',
{
options: this.options,
aggregatePipeline: this._pipeline,
});
}
);
}

const span = self._startSpan(
Expand Down Expand Up @@ -181,22 +178,22 @@ export class MongooseInstrumentation extends InstrumentationBase {
return (originalExec: Function) => {
return function exec(this: any, callback?: Function) {
if (
self._config.requireParentSpan &&
self.getConfig().requireParentSpan &&
trace.getSpan(context.active()) === undefined
) {
return originalExec.apply(this, arguments);
}

const parentSpan = this[_STORED_PARENT_SPAN];
const attributes: Attributes = {};
if (self._config.dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] =
self._config.dbStatementSerializer(this.op, {
condition: this._conditions,
updates: this._update,
options: this.options,
fields: this._fields,
});
const { dbStatementSerializer } = self.getConfig();
if (dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] = dbStatementSerializer(this.op, {
condition: this._conditions,
updates: this._update,
options: this.options,
fields: this._fields,
});
}
const span = self._startSpan(
this.mongooseCollection,
Expand All @@ -223,7 +220,7 @@ export class MongooseInstrumentation extends InstrumentationBase {
return (originalOnModelFunction: Function) => {
return function method(this: any, options?: any, callback?: Function) {
if (
self._config.requireParentSpan &&
self.getConfig().requireParentSpan &&
trace.getSpan(context.active()) === undefined
) {
return originalOnModelFunction.apply(this, arguments);
Expand All @@ -234,9 +231,12 @@ export class MongooseInstrumentation extends InstrumentationBase {
serializePayload.options = options;
}
const attributes: Attributes = {};
if (self._config.dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] =
self._config.dbStatementSerializer(op, serializePayload);
const { dbStatementSerializer } = self.getConfig();
if (dbStatementSerializer) {
attributes[SEMATTRS_DB_STATEMENT] = dbStatementSerializer(
op,
serializePayload
);
}
const span = self._startSpan(
this.constructor.collection,
Expand Down Expand Up @@ -331,7 +331,7 @@ export class MongooseInstrumentation extends InstrumentationBase {
originalThis,
span,
args,
self._config.responseHook,
self.getConfig().responseHook,
moduleVersion
)
);
Expand All @@ -342,14 +342,14 @@ export class MongooseInstrumentation extends InstrumentationBase {
return handlePromiseResponse(
response,
span,
self._config.responseHook,
self.getConfig().responseHook,
moduleVersion
);
}
}

private _callOriginalFunction<T>(originalFunction: (...args: any[]) => T): T {
if (this._config?.suppressInternalInstrumentation) {
if (this.getConfig().suppressInternalInstrumentation) {
return context.with(suppressTracing(context.active()), originalFunction);
} else {
return originalFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const DEFAULT_CONFIG: RuntimeNodeInstrumentationConfig = {
eventLoopUtilizationMeasurementInterval: 5000,
};

export class RuntimeNodeInstrumentation extends InstrumentationBase {
export class RuntimeNodeInstrumentation extends InstrumentationBase<RuntimeNodeInstrumentationConfig> {
private _ELUs: EventLoopUtilization[] = [];
private _interval: NodeJS.Timeout | undefined;

Expand Down Expand Up @@ -79,8 +79,7 @@ export class RuntimeNodeInstrumentation extends InstrumentationBase {
clearInterval(this._interval);
this._interval = setInterval(
() => this._addELU(),
(this._config as RuntimeNodeInstrumentationConfig)
.eventLoopUtilizationMeasurementInterval
this.getConfig().eventLoopUtilizationMeasurementInterval
);

// unref so that it does not keep the process running if disable() is never called
Expand Down
Loading

0 comments on commit 6dfe93c

Please sign in to comment.