diff --git a/experimental/packages/otlp-exporter-base/src/exporter-transport.ts b/experimental/packages/otlp-exporter-base/src/exporter-transport.ts index bb9deac834d..0123b26db99 100644 --- a/experimental/packages/otlp-exporter-base/src/exporter-transport.ts +++ b/experimental/packages/otlp-exporter-base/src/exporter-transport.ts @@ -17,6 +17,6 @@ import { ExportResponse } from './export-response'; export interface IExporterTransport { - send(data: Uint8Array): Promise; + send(data: Uint8Array, timeoutMillis: number): Promise; shutdown(): void; } diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts b/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts index 5ab20427f06..2c66d8bca9f 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts @@ -23,7 +23,7 @@ import { ISerializer } from '@opentelemetry/otlp-transformer'; import { IExporterTransport } from '../../exporter-transport'; import { createHttpExporterTransport } from './http-exporter-transport'; import { OTLPExporterError } from '../../types'; -import { createRetryingTransport } from '../../retryable-transport'; +import { createRetryingTransport } from '../../retrying-transport'; /** * Collector Metric Exporter abstract base class @@ -76,7 +76,6 @@ export abstract class OTLPExporterNodeBase< signalSpecificHeaders ), url: this.url, - timeoutMillis: this.timeoutMillis, }), }); } @@ -100,16 +99,19 @@ export abstract class OTLPExporterNodeBase< return; } - const promise = this._transport.send(data).then(response => { - if (response.status === 'success') { - onSuccess(); - return; - } - if (response.status === 'failure' && response.error) { - onError(response.error); - } - onError(new OTLPExporterError('Export failed with unknown error')); - }, onError); + const promise = this._transport + .send(data, this.timeoutMillis) + .then(response => { + if (response.status === 'success') { + onSuccess(); + } else if (response.status === 'failure' && response.error) { + onError(response.error); + } else if (response.status === 'retryable') { + onError(new OTLPExporterError('Export failed with retryable status')); + } else { + onError(new OTLPExporterError('Export failed with unknown error')); + } + }, onError); this._sendingPromises.push(promise); const popPromise = () => { diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts index 79647f1d9b3..1b638f7c41d 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts @@ -32,7 +32,7 @@ class HttpExporterTransport implements IExporterTransport { constructor(private _parameters: HttpRequestParameters) {} - async send(data: Uint8Array): Promise { + async send(data: Uint8Array, timeoutMillis: number): Promise { if (this._send == null) { // Lazy require to ensure that http/https is not required before instrumentations can wrap it. const { @@ -50,9 +50,15 @@ class HttpExporterTransport implements IExporterTransport { return new Promise(resolve => { // this will always be defined // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this._send?.(this._parameters, this._agent!, data, result => { - resolve(result); - }); + this._send?.( + this._parameters, + this._agent!, + data, + result => { + resolve(result); + }, + timeoutMillis + ); }); } shutdown() { diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts index da33d02cd93..1a041aedf22 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts @@ -22,11 +22,11 @@ export type sendWithHttp = ( params: HttpRequestParameters, agent: http.Agent | https.Agent, data: Uint8Array, - onDone: (response: ExportResponse) => void + onDone: (response: ExportResponse) => void, + timeoutMillis: number ) => void; export interface HttpRequestParameters { - timeoutMillis: number; url: string; headers: Record; compression: 'gzip' | 'none'; diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts index e1c13855a55..30c11caa94e 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts @@ -40,7 +40,8 @@ export function sendWithHttp( params: HttpRequestParameters, agent: http.Agent | https.Agent, data: Uint8Array, - onDone: (response: ExportResponse) => void + onDone: (response: ExportResponse) => void, + timeoutMillis: number ): void { const parsedUrl = new URL(params.url); const nodeVersion = Number(process.versions.node.split('.')[0]); @@ -86,7 +87,7 @@ export function sendWithHttp( }); }); - req.setTimeout(params.timeoutMillis, () => { + req.setTimeout(timeoutMillis, () => { req.destroy(); onDone({ status: 'failure', diff --git a/experimental/packages/otlp-exporter-base/src/retryable-transport.ts b/experimental/packages/otlp-exporter-base/src/retrying-transport.ts similarity index 66% rename from experimental/packages/otlp-exporter-base/src/retryable-transport.ts rename to experimental/packages/otlp-exporter-base/src/retrying-transport.ts index e48192c1fd2..c85ae8e07e9 100644 --- a/experimental/packages/otlp-exporter-base/src/retryable-transport.ts +++ b/experimental/packages/otlp-exporter-base/src/retrying-transport.ts @@ -33,24 +33,42 @@ function getJitter() { class RetryingTransport implements IExporterTransport { constructor(private _transport: IExporterTransport) {} - private retry(data: Uint8Array, inMillis: number): Promise { + private retry( + data: Uint8Array, + timeoutMillis: number, + inMillis: number + ): Promise { return new Promise((resolve, reject) => { setTimeout(() => { - this._transport.send(data).then(resolve, reject); + this._transport.send(data, timeoutMillis).then(resolve, reject); }, inMillis); }); } - async send(data: Uint8Array): Promise { - let result = await this._transport.send(data); + async send(data: Uint8Array, timeoutMillis: number): Promise { + const deadline = Date.now() + timeoutMillis; + let result = await this._transport.send(data, timeoutMillis); let attempts = MAX_ATTEMPTS; let nextBackoff = INITIAL_BACKOFF; while (result.status === 'retryable' && attempts > 0) { attempts--; - const backoff = Math.min(nextBackoff, MAX_BACKOFF) + getJitter(); + + // use maximum of computed backoff and 0 to avoid negative timeouts + const backoff = Math.max( + Math.min(nextBackoff, MAX_BACKOFF) + getJitter(), + 0 + ); nextBackoff = nextBackoff * BACKOFF_MULTIPLIER; - result = await this.retry(data, result.retryInMillis ?? backoff); + const retryInMillis = result.retryInMillis ?? backoff; + + // return when expected retry time is after the export deadline. + const remainingTimeoutMillis = deadline - Date.now(); + if (retryInMillis > remainingTimeoutMillis) { + return result; + } + + result = await this.retry(data, remainingTimeoutMillis, retryInMillis); } return result; diff --git a/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts index cd7be6ebbbe..d3af1fdd688 100644 --- a/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts +++ b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts @@ -17,10 +17,12 @@ import * as sinon from 'sinon'; import * as assert from 'assert'; import { IExporterTransport } from '../../src'; -import { createRetryingTransport } from '../../src/retryable-transport'; +import { createRetryingTransport } from '../../src/retrying-transport'; import { ExportResponse } from '../../src'; import { assertRejects } from '../testHelper'; +const timeoutMillis = 1000000; + describe('RetryingTransport', function () { describe('send', function () { it('does not retry when underlying transport succeeds', async function () { @@ -39,10 +41,14 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - const actualResponse = await transport.send(mockData); + const actualResponse = await transport.send(mockData, timeoutMillis); // assert - sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + sinon.assert.calledOnceWithExactly( + transportStubs.send, + mockData, + timeoutMillis + ); assert.deepEqual(actualResponse, expectedResponse); }); @@ -63,10 +69,14 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - const actualResponse = await transport.send(mockData); + const actualResponse = await transport.send(mockData, timeoutMillis); // assert - sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + sinon.assert.calledOnceWithExactly( + transportStubs.send, + mockData, + timeoutMillis + ); assert.deepEqual(actualResponse, expectedResponse); }); @@ -84,10 +94,14 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - await assertRejects(() => transport.send(mockData)); + await assertRejects(() => transport.send(mockData, timeoutMillis)); // assert - sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + sinon.assert.calledOnceWithExactly( + transportStubs.send, + mockData, + timeoutMillis + ); }); it('does retry when the underlying transport returns retryable', async function () { @@ -113,11 +127,19 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - const actualResponse = await transport.send(mockData); + const actualResponse = await transport.send(mockData, timeoutMillis); // assert sinon.assert.calledTwice(transportStubs.send); - sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + sinon.assert.alwaysCalledWithMatch( + transportStubs.send, + mockData, + sinon.match.number.and( + sinon.match(value => { + return value <= timeoutMillis; + }) + ) + ); assert.deepEqual(actualResponse, successResponse); }); @@ -143,11 +165,19 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - await assertRejects(() => transport.send(mockData)); + await assertRejects(() => transport.send(mockData, timeoutMillis)); // assert sinon.assert.calledTwice(transportStubs.send); - sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + sinon.assert.alwaysCalledWithMatch( + transportStubs.send, + mockData, + sinon.match.number.and( + sinon.match(value => { + return value <= timeoutMillis; + }) + ) + ); }); it('does retry 5 times, then resolves as retryable', async function () { @@ -169,11 +199,48 @@ describe('RetryingTransport', function () { const transport = createRetryingTransport({ transport: mockTransport }); // act - const result = await transport.send(mockData); + const result = await transport.send(mockData, timeoutMillis); // assert sinon.assert.callCount(transportStubs.send, 6); // 1 initial try and 5 retries - sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + sinon.assert.alwaysCalledWithMatch( + transportStubs.send, + mockData, + sinon.match.number.and( + sinon.match(value => { + return value <= timeoutMillis; + }) + ) + ); + assert.strictEqual(result, retryResponse); + }); + + it('does not retry when retryInMillis takes place after timeoutMillis', async function () { + // arrange + const retryResponse: ExportResponse = { + status: 'retryable', + retryInMillis: timeoutMillis + 100, + }; + + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon.stub().resolves(retryResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + const result = await transport.send(mockData, timeoutMillis); + + // assert + // initial try, no retries. + sinon.assert.calledOnceWithExactly( + transportStubs.send, + mockData, + timeoutMillis + ); assert.strictEqual(result, retryResponse); }); }); diff --git a/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts b/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts index 646e674bf5c..6ad33ec77cc 100644 --- a/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts +++ b/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts @@ -97,7 +97,6 @@ export abstract class OTLPGRPCExporterNodeBase< grpcName: grpcName, grpcPath: grpcPath, metadata: metadataProvider, - timeoutMillis: this.timeoutMillis, }); } @@ -126,16 +125,19 @@ export abstract class OTLPGRPCExporterNodeBase< return; } - const promise = this._transport.send(data).then(response => { - if (response.status === 'success') { - onSuccess(); - return; - } - if (response.status === 'failure' && response.error) { - onError(response.error); - } - onError(new OTLPExporterError('Export failed with unknown error')); - }, onError); + const promise = this._transport + .send(data, this.timeoutMillis) + .then(response => { + if (response.status === 'success') { + onSuccess(); + } else if (response.status === 'failure' && response.error) { + onError(response.error); + } else if (response.status === 'retryable') { + onError(new OTLPExporterError('Export failed with retryable status')); + } else { + onError(new OTLPExporterError('Export failed with unknown error')); + } + }, onError); this._sendingPromises.push(promise); const popPromise = () => { diff --git a/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts b/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts index 342e83d91e6..d3f3b193e63 100644 --- a/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts +++ b/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts @@ -88,7 +88,6 @@ export interface GrpcExporterTransportParameters { */ metadata: () => Metadata; compression: 'gzip' | 'none'; - timeoutMillis: number; } export class GrpcExporterTransport implements IExporterTransport { @@ -101,7 +100,7 @@ export class GrpcExporterTransport implements IExporterTransport { this._client?.close(); } - send(data: Uint8Array): Promise { + send(data: Uint8Array, timeoutMillis: number): Promise { // We need to make a for gRPC const buffer = Buffer.from(data); @@ -145,9 +144,7 @@ export class GrpcExporterTransport implements IExporterTransport { } return new Promise(resolve => { - // this will always be defined - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const deadline = Date.now() + this._parameters.timeoutMillis; + const deadline = Date.now() + timeoutMillis; // this should never happen if (this._metadata == null) { diff --git a/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts b/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts index edd98bcfd6a..28acc9cba15 100644 --- a/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts +++ b/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts @@ -57,7 +57,6 @@ const simpleClientConfig: GrpcExporterTransportParameters = { metadata.set('foo', 'bar'); return metadata; }, - timeoutMillis: 100, grpcPath: '/test/Export', grpcName: 'name', credentials: createInsecureCredentials, @@ -65,6 +64,8 @@ const simpleClientConfig: GrpcExporterTransportParameters = { address: 'localhost:1234', }; +const timeoutMillis = 100; + interface ExportedData { request: Buffer; metadata: Metadata; @@ -185,7 +186,7 @@ describe('GrpcExporterTransport', function () { it('calls _client.close() if client is defined', async function () { const transport = new GrpcExporterTransport(simpleClientConfig); // send something so that client is defined - await transport.send(Buffer.from([1, 2, 3])); + await transport.send(Buffer.from([1, 2, 3]), timeoutMillis); assert.ok(transport['_client'], '_client is not defined after send()'); const closeSpy = sinon.spy(transport['_client'], 'close'); @@ -223,7 +224,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(simpleClientConfig); const result = (await transport.send( - Buffer.from([1, 2, 3]) + Buffer.from([1, 2, 3]), + timeoutMillis )) as ExportResponseSuccess; assert.strictEqual(result.status, 'success'); @@ -250,7 +252,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(simpleClientConfig); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseSuccess; assert.strictEqual(result.status, 'success'); @@ -267,7 +270,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(simpleClientConfig); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseFailure; assert.strictEqual(result.status, 'failure'); @@ -281,7 +285,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(simpleClientConfig); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseFailure; assert.strictEqual(result.status, 'failure'); assert.ok(types.isNativeError(result.error)); @@ -297,7 +302,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(config); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseFailure; assert.strictEqual(result.status, 'failure'); assert.strictEqual(result.error, expectedError); @@ -313,7 +319,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(config); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseFailure; assert.strictEqual(result.status, 'failure'); assert.deepEqual(result.error, expectedError); @@ -329,7 +336,8 @@ describe('GrpcExporterTransport', function () { const transport = new GrpcExporterTransport(config); const result = (await transport.send( - Buffer.from([]) + Buffer.from([]), + timeoutMillis )) as ExportResponseFailure; assert.strictEqual(result.status, 'failure'); assert.strictEqual(result.error, expectedError);