From b36a1a2af1208e49a3660e404e69ddbbe5145953 Mon Sep 17 00:00:00 2001 From: legendecas Date: Thu, 30 Dec 2021 12:01:51 +0800 Subject: [PATCH] refactor: unifying shutdown once with BindOnceFuture --- .../exporter-trace-otlp-grpc/tsconfig.json | 9 +++ .../src/OTLPExporterBase.ts | 38 ++++------- .../browser/OTLPExporterBrowserBase.ts | 2 +- .../src/platform/node/OTLPExporterNodeBase.ts | 6 +- .../exporter-trace-otlp-http/tsconfig.json | 11 ++++ .../exporter-trace-otlp-proto/tsconfig.json | 9 +++ packages/opentelemetry-core/src/index.ts | 1 + .../opentelemetry-core/src/utils/callback.ts | 54 ++++++++++++++++ .../opentelemetry-core/src/utils/promise.ts | 39 ++++++++++++ .../opentelemetry-core/test/test-utils.ts | 30 +++++++++ .../test/utils/callback.test.ts | 63 +++++++++++++++++++ .../test/utils/promise.test.ts | 41 ++++++++++++ .../src/jaeger.ts | 54 ++++++---------- .../src/export/BatchSpanProcessorBase.ts | 47 +++++++------- .../src/export/SimpleSpanProcessor.ts | 30 ++++----- 15 files changed, 323 insertions(+), 111 deletions(-) create mode 100644 packages/opentelemetry-core/src/utils/callback.ts create mode 100644 packages/opentelemetry-core/src/utils/promise.ts create mode 100644 packages/opentelemetry-core/test/test-utils.ts create mode 100644 packages/opentelemetry-core/test/utils/callback.test.ts create mode 100644 packages/opentelemetry-core/test/utils/promise.test.ts diff --git a/packages/exporter-trace-otlp-grpc/tsconfig.json b/packages/exporter-trace-otlp-grpc/tsconfig.json index e95e7e3c768..573e18d61f0 100644 --- a/packages/exporter-trace-otlp-grpc/tsconfig.json +++ b/packages/exporter-trace-otlp-grpc/tsconfig.json @@ -11,6 +11,15 @@ "references": [ { "path": "../exporter-trace-otlp-http" + }, + { + "path": "../opentelemetry-core" + }, + { + "path": "../opentelemetry-resources" + }, + { + "path": "../opentelemetry-sdk-trace-base" } ] } diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 1cbf94f6334..2dc095bcc57 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -15,7 +15,7 @@ */ import { SpanAttributes, diag } from '@opentelemetry/api'; -import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import { ExportResult, ExportResultCode, BindOnceFuture } from '@opentelemetry/core'; import { OTLPExporterError, OTLPExporterConfigBase, @@ -34,9 +34,8 @@ export abstract class OTLPExporterBase< public readonly hostname: string | undefined; public readonly attributes?: SpanAttributes; protected _concurrencyLimit: number; - protected _isShutdown: boolean = false; - private _shuttingDownPromise: Promise = Promise.resolve(); protected _sendingPromises: Promise[] = []; + protected _shutdownOnce: BindOnceFuture; /** * @param config @@ -50,6 +49,7 @@ export abstract class OTLPExporterBase< this.attributes = config.attributes; this.shutdown = this.shutdown.bind(this); + this._shutdownOnce = new BindOnceFuture(this._shutdown, this); this._concurrencyLimit = typeof config.concurrencyLimit === 'number' @@ -66,7 +66,7 @@ export abstract class OTLPExporterBase< * @param resultCallback */ export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void { - if (this._isShutdown) { + if (this._shutdownOnce.isCalled) { resultCallback({ code: ExportResultCode.FAILED, error: new Error('Exporter has been shutdown'), @@ -106,28 +106,16 @@ export abstract class OTLPExporterBase< * Shutdown the exporter. */ shutdown(): Promise { - if (this._isShutdown) { - diag.debug('shutdown already started'); - return this._shuttingDownPromise; - } - this._isShutdown = true; + return this._shutdownOnce.call(); + } + + private _shutdown(): Promise { diag.debug('shutdown started'); - this._shuttingDownPromise = new Promise((resolve, reject) => { - Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return Promise.all(this._sendingPromises); - }) - .then(() => { - resolve(); - }) - .catch(e => { - reject(e); - }); - }); - return this._shuttingDownPromise; + this.onShutdown(); + return Promise.all(this._sendingPromises) + .then(() => { + /** ignore resolved values */ + }); } abstract onShutdown(): void; diff --git a/packages/exporter-trace-otlp-http/src/platform/browser/OTLPExporterBrowserBase.ts b/packages/exporter-trace-otlp-http/src/platform/browser/OTLPExporterBrowserBase.ts index 421772fcc24..9e6ff101441 100644 --- a/packages/exporter-trace-otlp-http/src/platform/browser/OTLPExporterBrowserBase.ts +++ b/packages/exporter-trace-otlp-http/src/platform/browser/OTLPExporterBrowserBase.ts @@ -69,7 +69,7 @@ export abstract class OTLPExporterBrowserBase< onSuccess: () => void, onError: (error: otlpTypes.OTLPExporterError) => void ): void { - if (this._isShutdown) { + if (this._shutdownOnce.isCalled) { diag.debug('Shutdown already started. Cannot send objects'); return; } diff --git a/packages/exporter-trace-otlp-http/src/platform/node/OTLPExporterNodeBase.ts b/packages/exporter-trace-otlp-http/src/platform/node/OTLPExporterNodeBase.ts index 920f0ac0434..c48d49fc420 100644 --- a/packages/exporter-trace-otlp-http/src/platform/node/OTLPExporterNodeBase.ts +++ b/packages/exporter-trace-otlp-http/src/platform/node/OTLPExporterNodeBase.ts @@ -56,16 +56,14 @@ export abstract class OTLPExporterNodeBase< this.compression = config.compression || CompressionAlgorithm.NONE; } - onInit(_config: OTLPExporterNodeConfigBase): void { - this._isShutdown = false; - } + onInit(_config: OTLPExporterNodeConfigBase): void {} send( objects: ExportItem[], onSuccess: () => void, onError: (error: otlpTypes.OTLPExporterError) => void ): void { - if (this._isShutdown) { + if (this._shutdownOnce.isCalled) { diag.debug('Shutdown already started. Cannot send objects'); return; } diff --git a/packages/exporter-trace-otlp-http/tsconfig.json b/packages/exporter-trace-otlp-http/tsconfig.json index bdc94d22137..1d7ba827ac5 100644 --- a/packages/exporter-trace-otlp-http/tsconfig.json +++ b/packages/exporter-trace-otlp-http/tsconfig.json @@ -7,5 +7,16 @@ "include": [ "src/**/*.ts", "test/**/*.ts" + ], + "references": [ + { + "path": "../opentelemetry-core" + }, + { + "path": "../opentelemetry-resources" + }, + { + "path": "../opentelemetry-sdk-trace-base" + } ] } diff --git a/packages/exporter-trace-otlp-proto/tsconfig.json b/packages/exporter-trace-otlp-proto/tsconfig.json index e95e7e3c768..573e18d61f0 100644 --- a/packages/exporter-trace-otlp-proto/tsconfig.json +++ b/packages/exporter-trace-otlp-proto/tsconfig.json @@ -11,6 +11,15 @@ "references": [ { "path": "../exporter-trace-otlp-http" + }, + { + "path": "../opentelemetry-core" + }, + { + "path": "../opentelemetry-resources" + }, + { + "path": "../opentelemetry-sdk-trace-base" } ] } diff --git a/packages/opentelemetry-core/src/index.ts b/packages/opentelemetry-core/src/index.ts index ea9e2d2a379..93167747527 100644 --- a/packages/opentelemetry-core/src/index.ts +++ b/packages/opentelemetry-core/src/index.ts @@ -39,4 +39,5 @@ export * from './utils/merge'; export * from './utils/sampling'; export * from './utils/url'; export * from './utils/wrap'; +export * from './utils/callback'; export * from './version'; diff --git a/packages/opentelemetry-core/src/utils/callback.ts b/packages/opentelemetry-core/src/utils/callback.ts new file mode 100644 index 00000000000..1757fcfc1dc --- /dev/null +++ b/packages/opentelemetry-core/src/utils/callback.ts @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Deferred } from './promise'; + +/** + * Bind the callback and only invoke the callback once regardless how many times `BindOnceFuture.call` is invoked. + */ +export class BindOnceFuture< + R, + This = unknown, + T extends (this: This, ...args: unknown[]) => R = () => R +> { + private _isCalled = false; + private _deferred = new Deferred(); + constructor(private _callback: T, private _that: This) {} + + get isCalled() { + return this._isCalled; + } + + get promise() { + return this._deferred.promise; + } + + call(...args: Parameters): Promise { + if (!this._isCalled) { + this._isCalled = true; + try { + Promise.resolve(this._callback.call(this._that, ...args)) + .then( + val => this._deferred.resolve(val), + err => this._deferred.reject(err) + ); + } catch (err) { + this._deferred.reject(err); + } + } + return this._deferred.promise; + } +} diff --git a/packages/opentelemetry-core/src/utils/promise.ts b/packages/opentelemetry-core/src/utils/promise.ts new file mode 100644 index 00000000000..ff0c8e90de8 --- /dev/null +++ b/packages/opentelemetry-core/src/utils/promise.ts @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export class Deferred { + private _promise: Promise; + private _resolve!: (val: T) => void; + private _reject!: (error: unknown) => void; + constructor() { + this._promise = new Promise((resolve, reject) => { + this._resolve = resolve; + this._reject = reject; + }); + } + + get promise() { + return this._promise; + } + + resolve(val: T) { + this._resolve(val); + } + + reject(err: unknown) { + this._reject(err); + } +} diff --git a/packages/opentelemetry-core/test/test-utils.ts b/packages/opentelemetry-core/test/test-utils.ts new file mode 100644 index 00000000000..72ec4277301 --- /dev/null +++ b/packages/opentelemetry-core/test/test-utils.ts @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; + +/** + * Node.js v8.x and browser compatible `assert.rejects`. + */ +export async function assertRejects(promise: any, expect: any) { + try { + await promise; + } catch (err) { + assert.throws(() => { + throw err; + }, expect); + } +} diff --git a/packages/opentelemetry-core/test/utils/callback.test.ts b/packages/opentelemetry-core/test/utils/callback.test.ts new file mode 100644 index 00000000000..72805d0f5e9 --- /dev/null +++ b/packages/opentelemetry-core/test/utils/callback.test.ts @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { BindOnceFuture } from '../../src'; +import { assertRejects } from '../test-utils'; + +describe('callback', () => { + describe('BindOnceFuture', () => { + it('should call once', async () => { + const stub = sinon.stub(); + const that = {}; + const future = new BindOnceFuture(stub, that); + + await Promise.all([ + future.call(1), + future.call(2), + ]); + await future.call(3); + await future.promise; + + assert.strictEqual(stub.callCount, 1); + assert.deepStrictEqual(stub.firstCall.args, [1]); + assert.deepStrictEqual(stub.firstCall.thisValue, that); + + assert(future.isCalled); + }); + + it('should handle thrown errors', async () => { + const stub = sinon.stub(); + stub.throws(new Error('foo')); + const future = new BindOnceFuture(stub, undefined); + + await assertRejects(future.call(), /foo/); + await assertRejects(future.call(), /foo/); + await assertRejects(future.promise, /foo/); + }); + + it('should handle rejections', async () => { + const stub = sinon.stub(); + stub.rejects(new Error('foo')); + const future = new BindOnceFuture(stub, undefined); + + await assertRejects(future.call(), /foo/); + await assertRejects(future.call(), /foo/); + await assertRejects(future.promise, /foo/); + }); + }); +}); diff --git a/packages/opentelemetry-core/test/utils/promise.test.ts b/packages/opentelemetry-core/test/utils/promise.test.ts new file mode 100644 index 00000000000..cf97fdf3b1b --- /dev/null +++ b/packages/opentelemetry-core/test/utils/promise.test.ts @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { Deferred } from '../../src/utils/promise'; +import { assertRejects } from '../test-utils'; + +describe('promise', () => { + describe('Deferred', () => { + it('should resolve', async () => { + const deferred = new Deferred(); + deferred.resolve(1); + deferred.resolve(2); + deferred.reject(new Error('foo')); + + const ret = await deferred.promise; + assert.strictEqual(ret, 1); + }); + + it('should reject', async () => { + const deferred = new Deferred(); + deferred.reject(new Error('foo')); + deferred.reject(new Error('bar')); + + await assertRejects(deferred.promise, /foo/); + }); + }); +}); diff --git a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts index 319d99ce377..3ae58807100 100644 --- a/packages/opentelemetry-exporter-jaeger/src/jaeger.ts +++ b/packages/opentelemetry-exporter-jaeger/src/jaeger.ts @@ -15,7 +15,7 @@ */ import { diag } from '@opentelemetry/api'; -import { ExportResult, ExportResultCode, getEnv } from '@opentelemetry/core'; +import { BindOnceFuture, ExportResult, ExportResultCode, getEnv } from '@opentelemetry/core'; import { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base'; import { Socket } from 'dgram'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; @@ -28,9 +28,7 @@ import * as jaegerTypes from './types'; export class JaegerExporter implements SpanExporter { private readonly _onShutdownFlushTimeout: number; private readonly _localConfig: jaegerTypes.ExporterConfig; - private _isShutdown = false; - private _shutdownFlushTimeout: NodeJS.Timeout | undefined; - private _shuttingDownPromise: Promise = Promise.resolve(); + private _shutdownOnce: BindOnceFuture; private _sender?: typeof jaegerTypes.UDPSender; @@ -57,6 +55,8 @@ export class JaegerExporter implements SpanExporter { localConfig.host = localConfig.host || env.OTEL_EXPORTER_JAEGER_AGENT_HOST; this._localConfig = localConfig; + + this._shutdownOnce = new BindOnceFuture(this._shutdown, this); } /** Exports a list of spans to Jaeger. */ @@ -64,6 +64,9 @@ export class JaegerExporter implements SpanExporter { spans: ReadableSpan[], resultCallback: (result: ExportResult) => void ): void { + if (this._shutdownOnce.isCalled) { + return; + } if (spans.length === 0) { return resultCallback({ code: ExportResultCode.SUCCESS }); } @@ -75,39 +78,18 @@ export class JaegerExporter implements SpanExporter { /** Shutdown exporter. */ shutdown(): Promise { - if (this._isShutdown) { - return this._shuttingDownPromise; - } - this._isShutdown = true; - - this._shuttingDownPromise = new Promise((resolve, reject) => { - let rejected = false; - this._shutdownFlushTimeout = setTimeout(() => { - rejected = true; - reject('timeout'); - this._sender.close(); - }, this._onShutdownFlushTimeout); - - Promise.resolve() - .then(() => { - // Make an optimistic flush. - return this._flush(); - }) - .then(() => { - if (rejected) { - return; - } else { - this._shutdownFlushTimeout && - clearTimeout(this._shutdownFlushTimeout); - resolve(); - this._sender.close(); - } - }) - .catch(e => { - reject(e); - }); + return this._shutdownOnce.call(); + } + + private _shutdown(): Promise { + return Promise.race([ + new Promise((_resolve, reject) => { + setTimeout(() => reject(new Error('Flush timeout')), this._onShutdownFlushTimeout); + }), + this._flush(), + ]).finally(() => { + this._sender?.close(); }); - return this._shuttingDownPromise; } /** Transform spans and sends to Jaeger service. */ diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 3785a6ea28c..1c32c83d894 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -16,6 +16,7 @@ import { context, TraceFlags } from '@opentelemetry/api'; import { + BindOnceFuture, ExportResultCode, getEnv, globalErrorHandler, @@ -40,8 +41,7 @@ export abstract class BatchSpanProcessorBase implements private _finishedSpans: ReadableSpan[] = []; private _timer: NodeJS.Timeout | undefined; - private _isShutdown = false; - private _shuttingDownPromise: Promise = Promise.resolve(); + private _shutdownOnce: BindOnceFuture; constructor(private readonly _exporter: SpanExporter, config?: T) { const env = getEnv(); @@ -61,11 +61,13 @@ export abstract class BatchSpanProcessorBase implements typeof config?.exportTimeoutMillis === 'number' ? config.exportTimeoutMillis : env.OTEL_BSP_EXPORT_TIMEOUT; + + this._shutdownOnce = new BindOnceFuture(this._shutdown, this); } forceFlush(): Promise { - if (this._isShutdown) { - return this._shuttingDownPromise; + if (this._shutdownOnce.isCalled) { + return this._shutdownOnce.promise; } return this._flushAll(); } @@ -74,7 +76,7 @@ export abstract class BatchSpanProcessorBase implements onStart(_span: Span): void {} onEnd(span: ReadableSpan): void { - if (this._isShutdown) { + if (this._shutdownOnce.isCalled) { return; } @@ -86,27 +88,20 @@ export abstract class BatchSpanProcessorBase implements } shutdown(): Promise { - if (this._isShutdown) { - return this._shuttingDownPromise; - } - this._isShutdown = true; - this._shuttingDownPromise = new Promise((resolve, reject) => { - Promise.resolve() - .then(() => { - return this.onShutdown(); - }) - .then(() => { - return this._flushAll(); - }) - .then(() => { - return this._exporter.shutdown(); - }) - .then(resolve) - .catch(e => { - reject(e); - }); - }); - return this._shuttingDownPromise; + return this._shutdownOnce.call(); + } + + private _shutdown() { + return Promise.resolve() + .then(() => { + return this.onShutdown(); + }) + .then(() => { + return this._flushAll(); + }) + .then(() => { + return this._exporter.shutdown(); + }); } /** Add a span in the buffer. */ diff --git a/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts index 20c096dbbed..c87b8648259 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts @@ -19,6 +19,7 @@ import { ExportResultCode, globalErrorHandler, suppressTracing, + BindOnceFuture, } from '@opentelemetry/core'; import { Span } from '../Span'; import { SpanProcessor } from '../SpanProcessor'; @@ -32,10 +33,11 @@ import { SpanExporter } from './SpanExporter'; * Only spans that are sampled are converted. */ export class SimpleSpanProcessor implements SpanProcessor { - constructor(private readonly _exporter: SpanExporter) {} + private _shutdownOnce: BindOnceFuture; - private _isShutdown = false; - private _shuttingDownPromise: Promise = Promise.resolve(); + constructor(private readonly _exporter: SpanExporter) { + this._shutdownOnce = new BindOnceFuture(this._shutdown, this); + } forceFlush(): Promise { // do nothing as all spans are being exported without waiting @@ -46,7 +48,7 @@ export class SimpleSpanProcessor implements SpanProcessor { onStart(_span: Span): void {} onEnd(span: ReadableSpan): void { - if (this._isShutdown) { + if (this._shutdownOnce.isCalled) { return; } @@ -70,20 +72,10 @@ export class SimpleSpanProcessor implements SpanProcessor { } shutdown(): Promise { - if (this._isShutdown) { - return this._shuttingDownPromise; - } - this._isShutdown = true; - this._shuttingDownPromise = new Promise((resolve, reject) => { - Promise.resolve() - .then(() => { - return this._exporter.shutdown(); - }) - .then(resolve) - .catch(e => { - reject(e); - }); - }); - return this._shuttingDownPromise; + return this._shutdownOnce.call(); + } + + private _shutdown(): Promise { + return this._exporter.shutdown(); } }