Skip to content

Commit

Permalink
fix(aws-lambda): BasicTracerProvider not force flushing (#661)
Browse files Browse the repository at this point in the history
Co-authored-by: William Armiros <54150514+willarmiros@users.noreply.github.com>
Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 21, 2021
1 parent 7edf984 commit 76e0d0f
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@opentelemetry/api": "1.0.2",
"@opentelemetry/core": "0.24.0",
"@opentelemetry/node": "0.24.0",
"@opentelemetry/tracing": "^0.24.0",
"@types/mocha": "7.0.2",
"@types/node": "14.17.9",
"codecov": "3.8.3",
Expand All @@ -64,7 +65,6 @@
"@opentelemetry/propagator-aws-xray": "^0.25.0",
"@opentelemetry/resources": "^0.24.0",
"@opentelemetry/semantic-conventions": "^0.24.0",
"@opentelemetry/tracing": "^0.24.0",
"@types/aws-lambda": "8.10.81"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import {
SemanticAttributes,
SemanticResourceAttributes,
} from '@opentelemetry/semantic-conventions';
import { BasicTracerProvider } from '@opentelemetry/tracing';

import {
APIGatewayProxyEventHeaders,
Expand Down Expand Up @@ -73,7 +72,7 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
export const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID';

export class AwsLambdaInstrumentation extends InstrumentationBase {
private _tracerProvider: TracerProvider | undefined;
private _forceFlush?: () => Promise<void>;

constructor(protected override _config: AwsLambdaInstrumentationConfig = {}) {
super('@opentelemetry/instrumentation-aws-lambda', VERSION, _config);
Expand Down Expand Up @@ -228,7 +227,27 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {

override setTracerProvider(tracerProvider: TracerProvider) {
super.setTracerProvider(tracerProvider);
this._tracerProvider = tracerProvider;
this._forceFlush = this._getForceFlush(tracerProvider);
}

private _getForceFlush(tracerProvider: TracerProvider) {
if (!tracerProvider) return undefined;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let currentProvider: any = tracerProvider;

if (typeof currentProvider.getDelegate === 'function') {
currentProvider = currentProvider.getDelegate();
}

if (typeof currentProvider.getActiveSpanProcessor === 'function') {
const activeSpanProcessor = currentProvider.getActiveSpanProcessor();
if (typeof activeSpanProcessor.forceFlush === 'function') {
return activeSpanProcessor.forceFlush.bind(activeSpanProcessor);
}
}

return undefined;
}

private _wrapCallback(original: Callback, span: Span): Callback {
Expand Down Expand Up @@ -267,15 +286,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {
}

span.end();
if (this._tracerProvider instanceof BasicTracerProvider) {
this._tracerProvider
.getActiveSpanProcessor()
.forceFlush()
.then(
() => callback(),
() => callback()
);

if (this._forceFlush) {
this._forceFlush().then(
() => callback(),
() => callback()
);
} else {
diag.error(
'Spans may not be exported for the lambda function because we are not force flushing before callback.'
);
callback();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// We access through node_modules to allow it to be patched.
/* eslint-disable node/no-extraneous-require */

import * as path from 'path';

import { AwsLambdaInstrumentation } from '../../src';
import {
BatchSpanProcessor,
InMemorySpanExporter,
} from '@opentelemetry/tracing';
import { NodeTracerProvider } from '@opentelemetry/node';
import { Context } from 'aws-lambda';
import * as assert from 'assert';
import { ProxyTracerProvider, TracerProvider } from '@opentelemetry/api';

const memoryExporter = new InMemorySpanExporter();

describe('force flush', () => {
let instrumentation: AwsLambdaInstrumentation;

let oldEnv: NodeJS.ProcessEnv;

const ctx = {
functionName: 'my_function',
invokedFunctionArn: 'my_arn',
awsRequestId: 'aws_request_id',
} as Context;

const initializeHandler = (handler: string, provider: TracerProvider) => {
process.env._HANDLER = handler;

instrumentation = new AwsLambdaInstrumentation();
instrumentation.setTracerProvider(provider);
};

const lambdaRequire = (module: string) =>
require(path.resolve(__dirname, '..', module));

beforeEach(() => {
oldEnv = { ...process.env };
process.env.LAMBDA_TASK_ROOT = path.resolve(__dirname, '..');
});

afterEach(() => {
process.env = oldEnv;
instrumentation.disable();

memoryExporter.reset();
});

it('should force flush NodeTracerProvider', async () => {
const provider = new NodeTracerProvider();
provider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
provider.register();
let forceFlushed = false;
const forceFlush = () =>
new Promise<void>(resolve => {
forceFlushed = true;
resolve();
});
provider.activeSpanProcessor.forceFlush = forceFlush;
initializeHandler('lambda-test/sync.handler', provider);

await new Promise((resolve, reject) => {
lambdaRequire('lambda-test/sync').handler(
'arg',
ctx,
(err: Error, res: any) => {
if (err) {
reject(err);
} else {
resolve(res);
}
}
);
});

assert.strictEqual(forceFlushed, true);
});

it('should force flush ProxyTracerProvider with NodeTracerProvider', async () => {
const nodeTracerProvider = new NodeTracerProvider();
nodeTracerProvider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
nodeTracerProvider.register();
const provider = new ProxyTracerProvider();
provider.setDelegate(nodeTracerProvider);
let forceFlushed = false;
const forceFlush = () =>
new Promise<void>(resolve => {
forceFlushed = true;
resolve();
});
nodeTracerProvider.activeSpanProcessor.forceFlush = forceFlush;
initializeHandler('lambda-test/sync.handler', provider);

await new Promise((resolve, reject) => {
lambdaRequire('lambda-test/sync').handler(
'arg',
ctx,
(err: Error, res: any) => {
if (err) {
reject(err);
} else {
resolve(res);
}
}
);
});

assert.strictEqual(forceFlushed, true);
});
});

0 comments on commit 76e0d0f

Please sign in to comment.