Skip to content

Commit

Permalink
feat(instrumentation-lambda): Flush MeterProvider at end of handler (#…
Browse files Browse the repository at this point in the history
…1370)

* feat(instrumentation-lambda): Flush MeterProvider at end of handler

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* Fix lint issues

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

---------

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>
Co-authored-by: Amir Blum <amirgiraffe@gmail.com>
  • Loading branch information
Aneurysm9 and blumamir authored Feb 8, 2023
1 parent 33c57cc commit 096129c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@opentelemetry/core": "^1.8.0",
"@opentelemetry/sdk-trace-base": "^1.8.0",
"@opentelemetry/sdk-trace-node": "^1.8.0",
"@opentelemetry/sdk-metrics": "^1.8.0",
"@types/mocha": "7.0.2",
"@types/node": "18.11.7",
"gts": "3.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
diag,
trace,
propagation,
MeterProvider,
Span,
SpanKind,
SpanStatusCode,
Expand Down Expand Up @@ -70,7 +71,8 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
export const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID';

export class AwsLambdaInstrumentation extends InstrumentationBase {
private _forceFlush?: () => Promise<void>;
private _traceForceFlusher?: () => Promise<void>;
private _metricForceFlusher?: () => Promise<void>;

constructor(protected override _config: AwsLambdaInstrumentationConfig = {}) {
super('@opentelemetry/instrumentation-aws-lambda', VERSION, _config);
Expand Down Expand Up @@ -226,10 +228,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {

override setTracerProvider(tracerProvider: TracerProvider) {
super.setTracerProvider(tracerProvider);
this._forceFlush = this._getForceFlush(tracerProvider);
this._traceForceFlusher = this._traceForceFlush(tracerProvider);
}

private _getForceFlush(tracerProvider: TracerProvider) {
private _traceForceFlush(tracerProvider: TracerProvider) {
if (!tracerProvider) return undefined;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -246,6 +248,24 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {
return undefined;
}

override setMeterProvider(meterProvider: MeterProvider) {
super.setMeterProvider(meterProvider);
this._metricForceFlusher = this._metricForceFlush(meterProvider);
}

private _metricForceFlush(meterProvider: MeterProvider) {
if (!meterProvider) return undefined;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const currentProvider: any = meterProvider;

if (typeof currentProvider.forceFlush === 'function') {
return currentProvider.forceFlush.bind(currentProvider);
}

return undefined;
}

private _wrapCallback(original: Callback, span: Span): Callback {
const plugin = this;
return function wrappedCallback(this: never, err, res) {
Expand Down Expand Up @@ -283,8 +303,8 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {

span.end();

if (this._forceFlush) {
this._forceFlush().then(
if (this._traceForceFlusher) {
this._traceForceFlusher().then(
() => callback(),
() => callback()
);
Expand All @@ -294,6 +314,17 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {
);
callback();
}
if (this._metricForceFlusher) {
this._metricForceFlusher().then(
() => callback(),
() => callback()
);
} else {
diag.error(
'Metrics may not be exported for the lambda function because we are not force flushing before callback.'
);
callback();
}
}

private _applyResponseHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { Context } from 'aws-lambda';
import * as assert from 'assert';
import { ProxyTracerProvider, TracerProvider } from '@opentelemetry/api';
import {
AggregationTemporality,
InMemoryMetricExporter,
MeterProvider,
PeriodicExportingMetricReader,
} from '@opentelemetry/sdk-metrics';

const memoryExporter = new InMemorySpanExporter();
const traceMemoryExporter = new InMemorySpanExporter();
const metricMemoryExporter = new InMemoryMetricExporter(
AggregationTemporality.CUMULATIVE
);

describe('force flush', () => {
let instrumentation: AwsLambdaInstrumentation;
Expand All @@ -42,13 +51,26 @@ describe('force flush', () => {
awsRequestId: 'aws_request_id',
} as Context;

const initializeHandler = (handler: string, provider: TracerProvider) => {
const initializeHandlerTracing = (
handler: string,
provider: TracerProvider
) => {
process.env._HANDLER = handler;

instrumentation = new AwsLambdaInstrumentation();
instrumentation.setTracerProvider(provider);
};

const initializeHandlerMetrics = (
handler: string,
provider: MeterProvider
) => {
process.env._HANDLER = handler;

instrumentation = new AwsLambdaInstrumentation();
instrumentation.setMeterProvider(provider);
};

const lambdaRequire = (module: string) =>
require(path.resolve(__dirname, '..', module));

Expand All @@ -61,12 +83,13 @@ describe('force flush', () => {
process.env = oldEnv;
instrumentation.disable();

memoryExporter.reset();
traceMemoryExporter.reset();
metricMemoryExporter.reset();
});

it('should force flush NodeTracerProvider', async () => {
const provider = new NodeTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
provider.addSpanProcessor(new BatchSpanProcessor(traceMemoryExporter));
provider.register();
let forceFlushed = false;
const forceFlush = () =>
Expand All @@ -75,7 +98,7 @@ describe('force flush', () => {
resolve();
});
provider.forceFlush = forceFlush;
initializeHandler('lambda-test/sync.handler', provider);
initializeHandlerTracing('lambda-test/sync.handler', provider);

await new Promise((resolve, reject) => {
lambdaRequire('lambda-test/sync').handler(
Expand All @@ -96,7 +119,9 @@ describe('force flush', () => {

it('should force flush ProxyTracerProvider with NodeTracerProvider', async () => {
const nodeTracerProvider = new NodeTracerProvider();
nodeTracerProvider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
nodeTracerProvider.addSpanProcessor(
new BatchSpanProcessor(traceMemoryExporter)
);
nodeTracerProvider.register();
const provider = new ProxyTracerProvider();
provider.setDelegate(nodeTracerProvider);
Expand All @@ -107,7 +132,38 @@ describe('force flush', () => {
resolve();
});
nodeTracerProvider.forceFlush = forceFlush;
initializeHandler('lambda-test/sync.handler', provider);
initializeHandlerTracing('lambda-test/sync.handler', provider);

await new Promise((resolve, reject) => {
lambdaRequire('lambda-test/sync').handler(
'arg',
ctx,
(err: Error, res: any) => {
if (err) {
reject(err);
} else {
resolve(res);
}
}
);
});

assert.strictEqual(forceFlushed, true);
});

it('should force flush MeterProvider', async () => {
const provider = new MeterProvider();
provider.addMetricReader(
new PeriodicExportingMetricReader({ exporter: metricMemoryExporter })
);
let forceFlushed = false;
const forceFlush = () =>
new Promise<void>(resolve => {
forceFlushed = true;
resolve();
});
provider.forceFlush = forceFlush;
initializeHandlerMetrics('lambda-test/sync.handler', provider);

await new Promise((resolve, reject) => {
lambdaRequire('lambda-test/sync').handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ import { AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray';
import { W3CTraceContextPropagator } from '@opentelemetry/core';

const memoryExporter = new InMemorySpanExporter();
const provider = new NodeTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
provider.register();

const assertSpanSuccess = (span: ReadableSpan) => {
assert.strictEqual(span.kind, SpanKind.SERVER);
Expand Down Expand Up @@ -118,8 +115,14 @@ describe('lambda handler', () => {
) => {
process.env._HANDLER = handler;

const provider = new NodeTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
provider.register();

instrumentation = new AwsLambdaInstrumentation(config);
instrumentation.setTracerProvider(provider);

return provider;
};

const lambdaRequire = (module: string) =>
Expand Down Expand Up @@ -667,7 +670,7 @@ describe('lambda handler', () => {
return propagation.extract(context.active(), event.contextCarrier);
};

initializeHandler('lambda-test/async.handler', {
const provider = initializeHandler('lambda-test/async.handler', {
disableAwsContextPropagation: true,
eventContextExtractor: customExtractor,
});
Expand Down

0 comments on commit 096129c

Please sign in to comment.