diff --git a/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts index 1ddac9722d3..da9a1b9d769 100644 --- a/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-grpc/src/CollectorExporterNodeBase.ts @@ -61,25 +61,17 @@ export abstract class CollectorExporterNodeBase< onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { - const promise = new Promise(resolve => { - const _onSuccess = (): void => { - onSuccess(); - _onFinish(); - }; - const _onError = (error: collectorTypes.CollectorExporterError): void => { - onError(error); - _onFinish(); - }; - const _onFinish = () => { - resolve(); - const index = this._sendingPromises.indexOf(promise); - this._sendingPromises.splice(index, 1); - }; - - this._send(this, objects, _onSuccess, _onError); - }); + const promise = new Promise((resolve, reject) => { + this._send(this, objects, resolve, reject); + }) + .then(onSuccess, onError); this._sendingPromises.push(promise); + const popPromise = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + } + promise.then(popPromise, popPromise); } onInit(config: CollectorExporterConfigNode): void { diff --git a/packages/opentelemetry-exporter-collector-grpc/test/CollectorExporterNodeBase.test.ts b/packages/opentelemetry-exporter-collector-grpc/test/CollectorExporterNodeBase.test.ts new file mode 100644 index 00000000000..84042f6ed59 --- /dev/null +++ b/packages/opentelemetry-exporter-collector-grpc/test/CollectorExporterNodeBase.test.ts @@ -0,0 +1,156 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { collectorTypes } from '@opentelemetry/exporter-collector'; +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; + +import * as assert from 'assert'; +import { CollectorExporterNodeBase } from '../src/CollectorExporterNodeBase'; +import { CollectorExporterConfigNode, ServiceClientType } from '../src/types'; +import { mockedReadableSpan } from './helper'; + +class MockCollectorExporter extends CollectorExporterNodeBase< + ReadableSpan, + ReadableSpan[] +> { + /** + * Callbacks passed to _send() + */ + sendCallbacks: { + onSuccess: () => void; + onError: (error: collectorTypes.CollectorExporterError) => void; + }[] = []; + + getDefaultUrl(config: CollectorExporterConfigNode): string { + return ''; + } + + getDefaultServiceName(config: CollectorExporterConfigNode): string { + return ''; + } + + convert(spans: ReadableSpan[]): ReadableSpan[] { + return spans; + } + + getServiceClientType() { + return ServiceClientType.SPANS; + } + + getServiceProtoPath(): string { + return 'opentelemetry/proto/collector/trace/v1/trace_service.proto'; + } +} + +// Mocked _send which just saves the callbacks for later +MockCollectorExporter.prototype['_send'] = function _sendMock( + self: MockCollectorExporter, + objects: ReadableSpan[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void +): void { + self.sendCallbacks.push({ onSuccess, onError }); +}; + +describe('CollectorExporterNodeBase', () => { + let exporter: MockCollectorExporter; + const concurrencyLimit = 5; + + beforeEach(done => { + exporter = new MockCollectorExporter({ concurrencyLimit }); + done(); + }); + + describe('export', () => { + it('should export requests concurrently', async () => { + const spans = [Object.assign({}, mockedReadableSpan)]; + const numToExport = concurrencyLimit; + + for (let i = 0; i < numToExport; ++i) { + exporter.export(spans, () => {}); + } + + assert.strictEqual(exporter['_sendingPromises'].length, numToExport); + const promisesAllDone = Promise.all(exporter['_sendingPromises']); + // Mock that all requests finish sending + exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess()); + + // All finished promises should be popped off + await promisesAllDone; + assert.strictEqual(exporter['_sendingPromises'].length, 0); + }); + + it('should drop new export requests when already sending at concurrencyLimit', async () => { + const spans = [Object.assign({}, mockedReadableSpan)]; + const numToExport = concurrencyLimit + 5; + + for (let i = 0; i < numToExport; ++i) { + exporter.export(spans, () => {}); + } + + assert.strictEqual(exporter['_sendingPromises'].length, concurrencyLimit); + const promisesAllDone = Promise.all(exporter['_sendingPromises']); + // Mock that all requests finish sending + exporter.sendCallbacks.forEach(({ onSuccess }) => onSuccess()); + + // All finished promises should be popped off + await promisesAllDone; + assert.strictEqual(exporter['_sendingPromises'].length, 0); + }); + + it('should pop export request promises even if they failed', async () => { + const spans = [Object.assign({}, mockedReadableSpan)]; + + exporter.export(spans, () => {}); + assert.strictEqual(exporter['_sendingPromises'].length, 1); + const promisesAllDone = Promise.all(exporter['_sendingPromises']); + // Mock that all requests fail sending + exporter.sendCallbacks.forEach(({ onError }) => + onError(new Error('Failed to send!!')) + ); + + // All finished promises should be popped off + await promisesAllDone; + assert.strictEqual(exporter['_sendingPromises'].length, 0); + }); + + it('should pop export request promises even if success callback throws error', async () => { + const spans = [Object.assign({}, mockedReadableSpan)]; + + exporter['_sendPromise']( + spans, + () => { + throw new Error('Oops'); + }, + () => {} + ); + + assert.strictEqual(exporter['_sendingPromises'].length, 1); + const promisesAllDone = Promise.all(exporter['_sendingPromises']) + // catch expected unhandled exception + .catch(() => {}); + + // Mock that the request finishes sending + exporter.sendCallbacks.forEach(({ onSuccess }) => { + onSuccess(); + }); + + // All finished promises should be popped off + await promisesAllDone; + assert.strictEqual(exporter['_sendingPromises'].length, 0); + }); + }); +}); diff --git a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts index 3f2e4c53bcd..9ff36306b75 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts @@ -47,25 +47,17 @@ export abstract class CollectorExporterNodeBase< onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { - const promise = new Promise(resolve => { - const _onSuccess = (): void => { - onSuccess(); - _onFinish(); - }; - const _onError = (error: collectorTypes.CollectorExporterError): void => { - onError(error); - _onFinish(); - }; - const _onFinish = () => { - resolve(); - const index = this._sendingPromises.indexOf(promise); - this._sendingPromises.splice(index, 1); - }; - - this._send(this, objects, this.compression, _onSuccess, _onError); - }); + const promise = new Promise((resolve, reject) => { + this._send(this, objects, this.compression, resolve, reject); + }) + .then(onSuccess, onError); this._sendingPromises.push(promise); + const popPromise = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + } + promise.then(popPromise, popPromise); } override onInit(config: CollectorExporterNodeConfigBase): void { diff --git a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts index 8e9a089c099..bf4af404b99 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorExporterBrowserBase.ts @@ -76,27 +76,20 @@ export abstract class CollectorExporterBrowserBase< const serviceRequest = this.convert(items); const body = JSON.stringify(serviceRequest); - const promise = new Promise(resolve => { - const _onSuccess = (): void => { - onSuccess(); - _onFinish(); - }; - const _onError = (error: collectorTypes.CollectorExporterError): void => { - onError(error); - _onFinish(); - }; - const _onFinish = () => { - resolve(); - const index = this._sendingPromises.indexOf(promise); - this._sendingPromises.splice(index, 1); - }; - + const promise = new Promise((resolve, reject) => { if (this._useXHR) { - sendWithXhr(body, this.url, this._headers, _onSuccess, _onError); + sendWithXhr(body, this.url, this._headers, resolve, reject); } else { - sendWithBeacon(body, this.url, { type: 'application/json' }, _onSuccess, _onError); + sendWithBeacon(body, this.url, { type: 'application/json' }, resolve, reject); } - }); + }) + .then(onSuccess, onError); + this._sendingPromises.push(promise); + const popPromise = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + } + promise.then(popPromise, popPromise); } } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 10a5bdca196..a60993e84c4 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -70,30 +70,23 @@ export abstract class CollectorExporterNodeBase< } const serviceRequest = this.convert(objects); - const promise = new Promise(resolve => { - const _onSuccess = (): void => { - onSuccess(); - _onFinish(); - }; - const _onError = (error: collectorTypes.CollectorExporterError): void => { - onError(error); - _onFinish(); - }; - const _onFinish = () => { - resolve(); - const index = this._sendingPromises.indexOf(promise); - this._sendingPromises.splice(index, 1); - }; + const promise = new Promise((resolve, reject) => { sendWithHttp( this, JSON.stringify(serviceRequest), 'application/json', - _onSuccess, - _onError + resolve, + reject ); - }); + }) + .then(onSuccess, onError); this._sendingPromises.push(promise); + const popPromise = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + } + promise.then(popPromise, popPromise); } onShutdown(): void {} diff --git a/packages/opentelemetry-exporter-zipkin/src/zipkin.ts b/packages/opentelemetry-exporter-zipkin/src/zipkin.ts index 17d2eb2bf9f..557c0faef3b 100644 --- a/packages/opentelemetry-exporter-zipkin/src/zipkin.ts +++ b/packages/opentelemetry-exporter-zipkin/src/zipkin.ts @@ -84,11 +84,16 @@ export class ZipkinExporter implements SpanExporter { this._sendSpans(spans, serviceName, result => { resolve(); resultCallback(result); - const index = this._sendingPromises.indexOf(promise); - this._sendingPromises.splice(index, 1); }); }); + + this._sendingPromises.push(promise); + const popPromise = () => { + const index = this._sendingPromises.indexOf(promise); + this._sendingPromises.splice(index, 1); + } + promise.then(popPromise, popPromise); } /**