From f947d5628afc3a3f8b429e43e9ddcf27e2a625de Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Thu, 13 Jan 2022 18:01:19 -0600 Subject: [PATCH 1/6] feat(otlp-exporter-http): wip add retries Signed-off-by: Svetlana Brennan --- .../src/OTLPExporterBase.ts | 41 +++++++++++++++++-- .../src/export/BatchSpanProcessorBase.ts | 9 ++-- .../src/export/SpanExporter.ts | 7 +++- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 2dc095bcc57..21a910c7546 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -64,8 +64,25 @@ export abstract class OTLPExporterBase< * Export items. * @param items * @param resultCallback + * @param exportTimeoutMillis - optional + * @param onError - optional */ - export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void { + + export(items: ExportItem[], resultCallback: (result: ExportResult) => void, exportTimeoutMillis?: number, onError?: (error: object) => void): void { + const DEFAULT_MAX_ATTEMPTS = 4; + const DEFAULT_INITIAL_BACKOFF = 15000; + const DEFAULT_BACKOFF_MULTIPLIER = 1.5; + + let retryTimer: ReturnType; + + const exportTimer = setTimeout(() => { + clearTimeout(retryTimer); + if (onError !== undefined) { + onError(new Error('Timeout')); + } + }, exportTimeoutMillis); + + if (this._shutdownOnce.isCalled) { resultCallback({ code: ExportResultCode.FAILED, @@ -82,13 +99,25 @@ export abstract class OTLPExporterBase< return; } - this._export(items) + const exportWithRetry = (retries = DEFAULT_MAX_ATTEMPTS, backoffMillis = DEFAULT_INITIAL_BACKOFF) => { + this._export(items) .then(() => { + clearTimeout(exportTimer); resultCallback({ code: ExportResultCode.SUCCESS }); }) .catch((error: ExportServiceError) => { - resultCallback({ code: ExportResultCode.FAILED, error }); + if (this._isRetryable(error.code) && retries > 0) { + retryTimer = setTimeout(() => { + return exportWithRetry(retries - 1, backoffMillis *DEFAULT_BACKOFF_MULTIPLIER); + }, backoffMillis); + } else { + clearTimeout(exportTimer); + resultCallback({ code: ExportResultCode.FAILED, error }); + } }); + }; + + exportWithRetry(); } private _export(items: ExportItem[]): Promise { @@ -102,6 +131,12 @@ export abstract class OTLPExporterBase< }); } + private _isRetryable(statusCode: number): boolean { + const retryCodes = [429, 502, 503, 504]; + + return retryCodes.includes(statusCode); + } + /** * Shutdown the exporter. */ diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 1c32c83d894..cb33e043eae 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -143,10 +143,6 @@ export abstract class BatchSpanProcessorBase implements return Promise.resolve(); } return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - // don't wait anymore for export, this way the next batch can start - reject(new Error('Timeout')); - }, this._exportTimeoutMillis); // prevent downstream exporter calls from generating spans context.with(suppressTracing(context.active()), () => { // Reset the finished spans buffer here because the next invocations of the _flush method @@ -155,7 +151,6 @@ export abstract class BatchSpanProcessorBase implements this._exporter.export( this._finishedSpans.splice(0, this._maxExportBatchSize), result => { - clearTimeout(timer); if (result.code === ExportResultCode.SUCCESS) { resolve(); } else { @@ -164,7 +159,9 @@ export abstract class BatchSpanProcessorBase implements new Error('BatchSpanProcessor: span export failed') ); } - } + }, + this._exportTimeoutMillis, + reject ); }); }); diff --git a/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts b/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts index b3b89d4aa61..f2f69da4e89 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/SpanExporter.ts @@ -28,10 +28,15 @@ export interface SpanExporter { /** * Called to export sampled {@link ReadableSpan}s. * @param spans the list of sampled Spans to be exported. + * @param resultCallback + * @param exportTimeoutMillis optional - currently only used by BatchSpanProcessorBase + * @param onError optional - currently only used by BatchSpanProcessorBase */ export( spans: ReadableSpan[], - resultCallback: (result: ExportResult) => void + resultCallback: (result: ExportResult) => void, + exportTimeoutMillis?: number, + onError?: (error: object) => void, ): void; /** Stops the exporter. */ From 09abc9398ccf22751ef06be5b6f14fa8be599dbb Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Fri, 14 Jan 2022 09:41:54 -0600 Subject: [PATCH 2/6] feat(otlp-exporter-http): fix backoff seconds Signed-off-by: Svetlana Brennan --- packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 21a910c7546..7aeb18e4390 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -70,7 +70,7 @@ export abstract class OTLPExporterBase< export(items: ExportItem[], resultCallback: (result: ExportResult) => void, exportTimeoutMillis?: number, onError?: (error: object) => void): void { const DEFAULT_MAX_ATTEMPTS = 4; - const DEFAULT_INITIAL_BACKOFF = 15000; + const DEFAULT_INITIAL_BACKOFF = 1000; const DEFAULT_BACKOFF_MULTIPLIER = 1.5; let retryTimer: ReturnType; From 6f42474c0abca1603d08455e067415a2d05b6a48 Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Fri, 14 Jan 2022 14:12:49 -0600 Subject: [PATCH 3/6] feat(otlp-exporter-http): fix export timer Signed-off-by: Svetlana Brennan --- .../src/OTLPExporterBase.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 7aeb18e4390..8f742d0e390 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -74,14 +74,16 @@ export abstract class OTLPExporterBase< const DEFAULT_BACKOFF_MULTIPLIER = 1.5; let retryTimer: ReturnType; + let exportTimer: ReturnType; - const exportTimer = setTimeout(() => { - clearTimeout(retryTimer); - if (onError !== undefined) { - onError(new Error('Timeout')); - } - }, exportTimeoutMillis); - + if (exportTimeoutMillis && onError) { + exportTimer = setTimeout(() => { + clearTimeout(retryTimer); + if (onError !== undefined) { + onError(new Error('Timeout')); + } + }, exportTimeoutMillis); + } if (this._shutdownOnce.isCalled) { resultCallback({ From c0c357805bb400a435af7ab74b6258f463f113f5 Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Fri, 14 Jan 2022 14:24:56 -0600 Subject: [PATCH 4/6] feat(otlp-exporter-http): refactor retry properties Signed-off-by: Svetlana Brennan --- .../src/OTLPExporterBase.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 8f742d0e390..67cf3264d2e 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -36,6 +36,10 @@ export abstract class OTLPExporterBase< protected _concurrencyLimit: number; protected _sendingPromises: Promise[] = []; protected _shutdownOnce: BindOnceFuture; + private DEFAULT_MAX_ATTEMPTS = 4; + private DEFAULT_INITIAL_BACKOFF = 1000; + private DEFAULT_BACKOFF_MULTIPLIER = 1.5; + private retryCodes = [429, 502, 503, 504]; /** * @param config @@ -69,9 +73,6 @@ export abstract class OTLPExporterBase< */ export(items: ExportItem[], resultCallback: (result: ExportResult) => void, exportTimeoutMillis?: number, onError?: (error: object) => void): void { - const DEFAULT_MAX_ATTEMPTS = 4; - const DEFAULT_INITIAL_BACKOFF = 1000; - const DEFAULT_BACKOFF_MULTIPLIER = 1.5; let retryTimer: ReturnType; let exportTimer: ReturnType; @@ -101,7 +102,7 @@ export abstract class OTLPExporterBase< return; } - const exportWithRetry = (retries = DEFAULT_MAX_ATTEMPTS, backoffMillis = DEFAULT_INITIAL_BACKOFF) => { + const exportWithRetry = (retries = this.DEFAULT_MAX_ATTEMPTS, backoffMillis = this.DEFAULT_INITIAL_BACKOFF) => { this._export(items) .then(() => { clearTimeout(exportTimer); @@ -110,7 +111,7 @@ export abstract class OTLPExporterBase< .catch((error: ExportServiceError) => { if (this._isRetryable(error.code) && retries > 0) { retryTimer = setTimeout(() => { - return exportWithRetry(retries - 1, backoffMillis *DEFAULT_BACKOFF_MULTIPLIER); + return exportWithRetry(retries - 1, backoffMillis * this.DEFAULT_BACKOFF_MULTIPLIER); }, backoffMillis); } else { clearTimeout(exportTimer); @@ -134,9 +135,7 @@ export abstract class OTLPExporterBase< } private _isRetryable(statusCode: number): boolean { - const retryCodes = [429, 502, 503, 504]; - - return retryCodes.includes(statusCode); + return this.retryCodes.includes(statusCode); } /** From fcfd6fad1acc1c38bdbdf140d9bb8e16f795bf77 Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Wed, 16 Feb 2022 12:46:49 -0600 Subject: [PATCH 5/6] feat(otlp-exporter-http): wip add http request retries Signed-off-by: Svetlana Brennan --- .../src/OTLPExporterBase.ts | 48 ++------ .../src/platform/node/util.ts | 109 +++++++++++++----- 2 files changed, 85 insertions(+), 72 deletions(-) diff --git a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts index 67cf3264d2e..08ee6eb4db3 100644 --- a/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts +++ b/packages/exporter-trace-otlp-http/src/OTLPExporterBase.ts @@ -36,10 +36,6 @@ export abstract class OTLPExporterBase< protected _concurrencyLimit: number; protected _sendingPromises: Promise[] = []; protected _shutdownOnce: BindOnceFuture; - private DEFAULT_MAX_ATTEMPTS = 4; - private DEFAULT_INITIAL_BACKOFF = 1000; - private DEFAULT_BACKOFF_MULTIPLIER = 1.5; - private retryCodes = [429, 502, 503, 504]; /** * @param config @@ -72,19 +68,7 @@ export abstract class OTLPExporterBase< * @param onError - optional */ - export(items: ExportItem[], resultCallback: (result: ExportResult) => void, exportTimeoutMillis?: number, onError?: (error: object) => void): void { - - let retryTimer: ReturnType; - let exportTimer: ReturnType; - - if (exportTimeoutMillis && onError) { - exportTimer = setTimeout(() => { - clearTimeout(retryTimer); - if (onError !== undefined) { - onError(new Error('Timeout')); - } - }, exportTimeoutMillis); - } + export(items: ExportItem[], resultCallback: (result: ExportResult) => void): void { if (this._shutdownOnce.isCalled) { resultCallback({ @@ -102,25 +86,13 @@ export abstract class OTLPExporterBase< return; } - const exportWithRetry = (retries = this.DEFAULT_MAX_ATTEMPTS, backoffMillis = this.DEFAULT_INITIAL_BACKOFF) => { - this._export(items) - .then(() => { - clearTimeout(exportTimer); - resultCallback({ code: ExportResultCode.SUCCESS }); - }) - .catch((error: ExportServiceError) => { - if (this._isRetryable(error.code) && retries > 0) { - retryTimer = setTimeout(() => { - return exportWithRetry(retries - 1, backoffMillis * this.DEFAULT_BACKOFF_MULTIPLIER); - }, backoffMillis); - } else { - clearTimeout(exportTimer); - resultCallback({ code: ExportResultCode.FAILED, error }); - } - }); - }; - - exportWithRetry(); + this._export(items) + .then(() => { + resultCallback({ code: ExportResultCode.SUCCESS }); + }) + .catch((error: ExportServiceError) => { + resultCallback({ code: ExportResultCode.FAILED, error }); + }); } private _export(items: ExportItem[]): Promise { @@ -134,10 +106,6 @@ export abstract class OTLPExporterBase< }); } - private _isRetryable(statusCode: number): boolean { - return this.retryCodes.includes(statusCode); - } - /** * Shutdown the exporter. */ diff --git a/packages/exporter-trace-otlp-http/src/platform/node/util.ts b/packages/exporter-trace-otlp-http/src/platform/node/util.ts index 0e40b585dbf..fac3a525ef0 100644 --- a/packages/exporter-trace-otlp-http/src/platform/node/util.ts +++ b/packages/exporter-trace-otlp-http/src/platform/node/util.ts @@ -25,6 +25,9 @@ import { diag } from '@opentelemetry/api'; import { CompressionAlgorithm } from './types'; let gzip: zlib.Gzip | undefined; +const DEFAULT_MAX_ATTEMPTS = 4; +const DEFAULT_INITIAL_BACKOFF = 1000; +const DEFAULT_BACKOFF_MULTIPLIER = 1.5; /** * Sends data using http @@ -43,6 +46,21 @@ export function sendWithHttp( ): void { const parsedUrl = new url.URL(collector.url); + // temp code - this will be merged from timeout pr + const exporterTimeout = collector._timeoutMillis; + let reqIsDestroyed: boolean; + + let req: http.ClientRequest; + let retryTimer: ReturnType; + + const exporterTimer = setTimeout(() => { + clearTimeout(retryTimer); + reqIsDestroyed = true; + req.destroy(); + // create error here? + // onError(new Error('Request Timeout')); + }, exporterTimeout); + const options: http.RequestOptions | https.RequestOptions = { hostname: parsedUrl.hostname, port: parsedUrl.port, @@ -58,48 +76,75 @@ export function sendWithHttp( const request = parsedUrl.protocol === 'http:' ? http.request : https.request; - const req = request(options, (res: http.IncomingMessage) => { - let responseData = ''; - res.on('data', chunk => (responseData += chunk)); - res.on('end', () => { - if (res.statusCode && res.statusCode < 299) { - diag.debug(`statusCode: ${res.statusCode}`, responseData); - onSuccess(); - } else { - const error = new otlpTypes.OTLPExporterError( - res.statusMessage, - res.statusCode, - responseData + const sendWithRetry = (retries = DEFAULT_MAX_ATTEMPTS, backoffMillis = DEFAULT_INITIAL_BACKOFF) => { + req = request(options, (res: http.IncomingMessage) => { + let responseData = ''; + res.on('data', chunk => (responseData += chunk)); + res.on('end', () => { + if (res.statusCode && res.statusCode < 299) { + diag.debug(`statusCode: ${res.statusCode}`, responseData); + // clear all timers since request was successful and resolve promise + clearTimeout(exporterTimer); + clearTimeout(retryTimer); + onSuccess(); + } else if (res.statusCode && isRetryable(res.statusCode) && retries > 0) { + retryTimer = setTimeout(() => { + return sendWithRetry(retries - 1, backoffMillis * DEFAULT_BACKOFF_MULTIPLIER); + }, backoffMillis); + } else { + const error = new otlpTypes.OTLPExporterError( + res.statusMessage, + res.statusCode, + responseData + ); + // clear all timers since request failed and there are no more retries left + // then reject promise + clearTimeout(exporterTimer); + clearTimeout(retryTimer); + onError(error); + } + }); + }); + + // temp code - this will be merged from timeout pr + req.on('error', (error: Error | any) => { + if (reqIsDestroyed) { + const err = new otlpTypes.OTLPExporterError( + 'Request Timeout', error.code ); + onError(err); + } else { onError(error); } }); - }); - - req.on('error', (error: Error) => { - onError(error); - }); + switch (collector.compression) { + case CompressionAlgorithm.GZIP: { + if (!gzip) { + gzip = zlib.createGzip(); + } + req.setHeader('Content-Encoding', 'gzip'); + const dataStream = readableFromBuffer(data); + dataStream.on('error', onError) + .pipe(gzip).on('error', onError) + .pipe(req); - switch (collector.compression) { - case CompressionAlgorithm.GZIP: { - if (!gzip) { - gzip = zlib.createGzip(); + break; } - req.setHeader('Content-Encoding', 'gzip'); - const dataStream = readableFromBuffer(data); - dataStream.on('error', onError) - .pipe(gzip).on('error', onError) - .pipe(req); + default: + req.write(data); + req.end(); - break; + break; } - default: - req.write(data); - req.end(); + }; + sendWithRetry(); +} - break; - } +function isRetryable(statusCode: number) { + const retryCodes = [429, 502, 503, 504]; + + return retryCodes.includes(statusCode); } function readableFromBuffer(buff: string | Buffer): Readable { From dbce62e52c884da2ea0b44584d254135437bc0ae Mon Sep 17 00:00:00 2001 From: Svetlana Brennan Date: Wed, 16 Feb 2022 12:51:16 -0600 Subject: [PATCH 6/6] feat(otlp-exporter-http): restore bsp timeout logic Signed-off-by: Svetlana Brennan --- .../src/export/BatchSpanProcessorBase.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index 18f33b05ea9..e6417d84afc 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -143,6 +143,10 @@ export abstract class BatchSpanProcessorBase implements return Promise.resolve(); } return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + // don't wait anymore for export, this way the next batch can start + reject(new Error('Timeout')); + }, this._exportTimeoutMillis); // prevent downstream exporter calls from generating spans context.with(suppressTracing(context.active()), () => { // Reset the finished spans buffer here because the next invocations of the _flush method @@ -151,6 +155,7 @@ export abstract class BatchSpanProcessorBase implements this._exporter.export( this._finishedSpans.splice(0, this._maxExportBatchSize), result => { + clearTimeout(timer); if (result.code === ExportResultCode.SUCCESS) { resolve(); } else { @@ -159,9 +164,7 @@ export abstract class BatchSpanProcessorBase implements new Error('BatchSpanProcessor: span export failed') ); } - }, - this._exportTimeoutMillis, - reject + } ); }); });