From 6c4f9891eed69d8c2916d85954c56d85983d61ad Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Mon, 20 Dec 2021 10:00:08 +0100 Subject: [PATCH 01/22] feat(metric-reader): add metric-reader. --- .../src/export/MetricReader.ts | 187 +++++++++- .../export/PeriodicExportingMetricReader.ts | 89 +++++ .../PeriodicExportingMetricReader.test.ts | 344 ++++++++++++++++++ .../test/state/MetricCollector.test.ts | 19 +- 4 files changed, 621 insertions(+), 18 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 2d4cd5c0a76..52689f7cecc 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -15,51 +15,206 @@ */ import { AggregationTemporality } from './AggregationTemporality'; -import { MetricExporter } from './MetricExporter'; import { MetricProducer } from './MetricProducer'; +import { MetricData } from './MetricData'; + +export type ReaderErrorOptions = { + message: string, + error?: Error; +} + +// TODO: result callback, look at trace implementation +/** + * Base error thrown by the reader. + */ +export class ReaderError extends Error { + // Optional error that caused this error. + public readonly innerError?: Error; + + /** + * Creates a new instance of the ReaderError class. + * @param options + */ + constructor(options: ReaderErrorOptions) { + super(options.message); + this.innerError = options.error; + } +} + +/** + * Error that is thrown on timeouts (i.e. timeout on forceFlush or shutdown) + */ +export class ReaderTimeoutError extends ReaderError { +} + +/** + * Error that is thrown on failures (i.e. unhandled exceptions during forceFlush or shutdown) + */ +export class ReaderFailureError extends ReaderError { +} // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader +/** + * A registered reader of metrics that, when linked to a {@link MetricProducer} offers, global + * control over metrics. + */ export abstract class MetricReader { + // Tracks the shutdown state. private _shutdown = false; + // MetricProducer used by this instance. private _metricProducer?: MetricProducer; - constructor(private _exporter: MetricExporter) {} + constructor(private readonly _preferredAggregationTemporality = AggregationTemporality.CUMULATIVE) { + } + /** + * Set the {@link MetricProducer} used by this instance. + * + * @param metricProducer + */ setMetricProducer(metricProducer: MetricProducer) { this._metricProducer = metricProducer; + this.onInitialized(); } + /** + * Get the {@link AggregationTemporality} preferred by this {@link MetricReader} + */ getPreferredAggregationTemporality(): AggregationTemporality { - return this._exporter.getPreferredAggregationTemporality(); + return this._preferredAggregationTemporality; } - async collect(): Promise { + /** + * Handle once the SDK has initialized this {@link MetricReader} + * + * @protected + */ + protected abstract onInitialized(): void; + + /** + * Handle a shutdown signal by the SDK. + * + *

For push exporters, this should shut down any intervals and close any open connections. + * @protected + */ + protected abstract onShutdown(): Promise; + + /** + * Handle a force flush signal by the SDK. + * + *

In all scenarios metrics should be collected via {@link collect()}. + *

For push exporters, this should collect and report metrics. + * @protected + */ + protected abstract onForceFlush(): Promise; + + /** + * Collect all metrics from the associated {@link MetricProducer} + */ + async collect(): Promise { if (this._metricProducer === undefined) { - throw new Error('MetricReader is not bound to a MeterProvider'); + throw new ReaderFailureError({ message: 'MetricReader is not bound to a MetricProducer' }); } - const metrics = await this._metricProducer.collect(); - // errors thrown to caller - await this._exporter.export(metrics); + // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. + if (this._shutdown) { + throw new ReaderFailureError({ message: 'Collection is not allowed after shutdown' }); + } + + return await this._metricProducer.collect(); + } + + /** + * Calls an async function with a timeout. Will reject if the async function passed to this does not complete + * before the timeout is reached. + * @param promise promise to use with timeout. + * @param timeout timeout in milliseconds until the returned promise is rejected. + * @protected + */ + protected callWithTimeout(promise: Promise, timeout: number): Promise { + let timeoutHandle: ReturnType; + + const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { + timeoutHandle = setTimeout( + function timeoutHandler() { + // TODO: This does not produce an adequate stacktrace. + reject(new ReaderTimeoutError({ message: 'Operation timed out.' })) + }, + timeout + ); + }); + + return Promise.race([promise, timeoutPromise]).then(result => { + clearTimeout(timeoutHandle); + return result; + }, + reason => { + clearTimeout(timeoutHandle); + throw reason; + }); + } + + + private async _tryShutdown() { + try { + await this.onShutdown(); + } catch (err) { + // Re-throw timeout errors. + if (err instanceof ReaderTimeoutError) { + throw err; + } + + throw new ReaderFailureError({ + message: 'Unexpected error during shutdown.', + error: err + }); + + } } - async shutdown(): Promise { + /** + * Shuts down the metric reader + * @param shutdownTimeout timeout for shutdown (default: 10000ms) + */ + // TODO: function will continue. + async shutdown(shutdownTimeout = 10000): Promise { + // Do not call shutdown again if it has already been called. if (this._shutdown) { - return; + throw new ReaderFailureError({ message: 'Cannot call shutdown twice.' }) } + await this.callWithTimeout(this._tryShutdown(), shutdownTimeout); + + // TODO: (spec) can i not call shutdown twice even on shutdown failure? this._shutdown = true; - // errors thrown to caller - await this._exporter.shutdown(); } - async forceFlush(): Promise { + private async _tryForceFlush() { + try { + await this.onForceFlush(); + } catch (err) { + // Re-throw timeout errors. + if (err instanceof ReaderTimeoutError) { + throw err; + } + + throw new ReaderFailureError({ + message: 'Unexpected error during forceFlush.', + error: err + }); + } + } + + /** + * Flushes metrics read by this reader. + * @param forceFlushTimeout timeout for force-flush (default: 10000 ms) + */ + async forceFlush(forceFlushTimeout = 10000): Promise { if (this._shutdown) { - return; + throw new ReaderFailureError({ message: 'Cannot forceFlush on already shutdown MetricReader' }) } - // errors thrown to caller - await this._exporter.forceFlush(); + await this.callWithTimeout(this._tryForceFlush(), forceFlushTimeout); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts new file mode 100644 index 00000000000..3043037f7dd --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -0,0 +1,89 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as api from '@opentelemetry/api'; +import { MetricReader } from './MetricReader'; +import { MetricExporter } from './MetricExporter'; + +export type PeriodicExportingMetricReaderOptions = { + exporter: MetricExporter + exportIntervalMillis?: number, + exportTimeoutMillis?: number +} + +/** + * {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to + * the configured {@link MetricExporter} + */ +export class PeriodicExportingMetricReader extends MetricReader { + private _interval?: ReturnType; + + private _exporter: MetricExporter; + + private readonly _exportInterval: number; + + private readonly _exportTimeout: number; + + constructor(options: PeriodicExportingMetricReaderOptions) { + super(options.exporter.getPreferredAggregationTemporality()); + + if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) { + throw Error('exportIntervalMillis must be greater than 0'); + } + + if (options.exportTimeoutMillis !== undefined && options.exportTimeoutMillis <= 0) { + throw Error('exportTimeoutMillis must be greater than 0'); + } + + if (options.exportTimeoutMillis !== undefined && + options.exportIntervalMillis !== undefined && + options.exportIntervalMillis < options.exportTimeoutMillis) { + throw Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis'); + } + + this._exportInterval = options.exportIntervalMillis ?? 60000; + this._exportTimeout = options.exportTimeoutMillis ?? 30000; + this._exporter = options.exporter; + } + + private async _runOnce(): Promise { + const metrics = await this.collect(); + await this._exporter.export(metrics); + } + + protected onInitialized(): void { + this._interval = setInterval(async () => { + try { + await this.callWithTimeout(this._runOnce(), this._exportTimeout); + } catch (err) { + api.diag.error('Unexpected error during export: %s', err) + } + }, this._exportInterval); + } + + protected async onForceFlush(): Promise { + await this._runOnce(); + } + + protected async onShutdown(): Promise { + if (this._interval) { + clearInterval(this._interval); + } + + await this._runOnce(); + await this._exporter.shutdown(); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts new file mode 100644 index 00000000000..32970b96394 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -0,0 +1,344 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { MetricExporter, ReaderFailureError, ReaderTimeoutError } from '../../src'; +import { MetricData } from '../../src/export/MetricData'; +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { MetricProducer } from '../../src/export/MetricProducer'; + +const MAX_32_BIT_INT = 2 ** 31 - 1 + +class TestMetricExporter extends MetricExporter { + metricDataList: MetricData[] = [] + public exportTime = 0; + public throwException = false; + + async export(batch: MetricData[]): Promise { + this.metricDataList.push(...batch); + if (this.throwException) { + throw new Error('Error during export'); + } + await new Promise(resolve => setTimeout(resolve, this.exportTime)); + } + + async forceFlush(): Promise { + } + + getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.CUMULATIVE; + } +} + +class TestDeltaMetricExporter extends TestMetricExporter { + override getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.DELTA; + } +} + +class WaitingMetricExporter extends MetricExporter { + private _batches: MetricData[][]; + public throwException: boolean; + + constructor() { + super(); + this._batches = []; + this.throwException = false; + } + + async export(batch: MetricData[]): Promise { + this._batches.push([]); + + if (this.throwException) { + throw new Error('Error during export'); + } + } + + async forceFlush(): Promise { + } + + async waitForNumberOfExports(numberOfExports: number): Promise { + if (numberOfExports <= 0) { + throw new Error('numberOfExports must be greater than or equal to 0'); + } + + while (this._batches.length < numberOfExports) { + await new Promise(resolve => setTimeout(resolve, 20)); + } + return this._batches.slice(0, numberOfExports); + } + + getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.CUMULATIVE; + } +} + +class TestMetricProducer implements MetricProducer { + async collect(): Promise { + return []; + } +} + +describe('PeriodicExportingMetricReader', () => { + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should construct PeriodicExportingMetricReader without exceptions', () => { + const exporter = new TestDeltaMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 4000, + exportTimeoutMillis: 3000 + } + ); + assert.strictEqual(reader.getPreferredAggregationTemporality(), exporter.getPreferredAggregationTemporality()); + }) + + it('should throw when interval less or equal to 0', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 0, + exportTimeoutMillis: 0 + }), new Error('exportIntervalMillis must be greater than 0')); + }) + + it('should throw when timeout less or equal to 0', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 0 + }), new Error('exportTimeoutMillis must be greater than 0')); + }) + + it('should throw when timeout less or equal to interval', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws(() => new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 100, + exportTimeoutMillis: 200 + }), new Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis')); + }) + + it('should not start exporting', async () => { + const exporter = new TestDeltaMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('export').never(); + + new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 1 + }); + await new Promise(resolve => setTimeout(resolve, 100)); + + exporterMock.verify(); + }) + }); + + describe('setMetricProducer', () => { + it('should start exporting periodically', async () => { + const exporter = new WaitingMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 50, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + const result = await exporter.waitForNumberOfExports(2); + + assert.deepEqual(result, [[], []]); + await reader.shutdown(); + }).timeout(1000); + }); + + describe('periodic export', () => { + it('should keep running on export errors', async () => { + const exporter = new WaitingMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 50, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + const result = await exporter.waitForNumberOfExports(2); + assert.deepEqual(result, [[], []]); + + exporter.throwException = false; + await reader.shutdown(); + }); + }); + + describe('forceFlush', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should force export', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('export').calledOnceWithExactly([]); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.forceFlush(); + + sinon.verify(); + await reader.shutdown(); + }); + + it('should throw ReaderTimeoutError when export takes too long', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 1000; + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + await assert.rejects(reader.forceFlush(20)); + + exporter.exportTime = 0; + await reader.shutdown(20); + }).timeout(1000); + + it('should throw when handler throws', async () => { + const exporter = new TestMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(reader.forceFlush(), + thrown => thrown instanceof ReaderFailureError, + 'Unexpected error during shutdown.'); + + exporter.throwException = false; + await exporter.shutdown(); + }); + + it('should throw after shutdown', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + + await reader.shutdown(); + await assert.rejects(reader.forceFlush(), + thrown => thrown instanceof ReaderFailureError, + ); + }); + }); + + describe('shutdown', () => { + it('should export one last time', async () => { + const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('export').calledOnceWithExactly([]); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.shutdown(); + + exporterMock.verify(); + }); + + it('should throw ReaderTimeoutError when export takes too long', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 1000; + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + await assert.rejects(reader.shutdown(20), + thrown => thrown instanceof ReaderTimeoutError, + ); + }).timeout(1000); + + it('should throw when called twice', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80 + }); + + reader.setMetricProducer(new TestMetricProducer()); + await reader.shutdown(); + await assert.rejects(reader.shutdown(), + thrown => thrown instanceof ReaderFailureError, + ); + }); + + it('should throw when handler throws', async () => { + const exporter = new TestMetricExporter(); + exporter.throwException = true; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(reader.shutdown(), + thrown => thrown instanceof ReaderFailureError, + 'Unexpected error during shutdown.'); + }); + }); + + describe('collect', () => { + it('should throw on non-initialized instance', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + await assert.rejects(reader.collect(), + thrown => thrown instanceof ReaderFailureError, + 'MetricReader is not bound to a MetricProducer'); + }); + }) +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 900225cdf3a..f984a8a8f08 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -48,6 +48,21 @@ class TestMetricReader extends MetricReader { getMetricCollector(): MetricCollector { return this['_metricProducer'] as MetricCollector; } + + protected handleCollectedMetrics(metrics: MetricData[]): Promise { + return Promise.resolve(undefined); + } + + protected onForceFlush(): Promise { + return Promise.resolve(undefined); + } + + protected onShutdown(): Promise { + return Promise.resolve(undefined); + } + + protected onInitialized(): void { + } } describe('MetricCollector', () => { @@ -60,7 +75,7 @@ describe('MetricCollector', () => { const meterProviderSharedState = new MeterProviderSharedState(defaultResource); const exporters = [ new TestMetricExporter(), new TestDeltaMetricExporter() ]; for (const exporter of exporters) { - const reader = new TestMetricReader(exporter); + const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality()); const metricCollector = new MetricCollector(meterProviderSharedState, reader); assert.strictEqual(metricCollector.aggregatorTemporality, exporter.getPreferredAggregationTemporality()); @@ -73,7 +88,7 @@ describe('MetricCollector', () => { // TODO(legendecas): setup with MeterProvider when meter identity was settled. const meterProviderSharedState = new MeterProviderSharedState(defaultResource); - const reader = new TestMetricReader(exporter); + const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality()); const metricCollector = new MetricCollector(meterProviderSharedState, reader); meterProviderSharedState.metricCollectors.push(metricCollector); From 4a767a71a24c578c6affe245d6f9282a00da183d Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Tue, 21 Dec 2021 18:48:32 +0100 Subject: [PATCH 02/22] refactor(metric-reader): use callbacks --- .../src/export/MetricReader.ts | 184 ++++++++++-------- .../export/PeriodicExportingMetricReader.ts | 35 +++- .../PeriodicExportingMetricReader.test.ts | 111 +++++++---- 3 files changed, 195 insertions(+), 135 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 52689f7cecc..24d1d2a9fcb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -18,45 +18,23 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { MetricData } from './MetricData'; -export type ReaderErrorOptions = { - message: string, - error?: Error; -} - -// TODO: result callback, look at trace implementation -/** - * Base error thrown by the reader. - */ -export class ReaderError extends Error { - // Optional error that caused this error. - public readonly innerError?: Error; - - /** - * Creates a new instance of the ReaderError class. - * @param options - */ - constructor(options: ReaderErrorOptions) { - super(options.message); - this.innerError = options.error; - } -} -/** - * Error that is thrown on timeouts (i.e. timeout on forceFlush or shutdown) - */ -export class ReaderTimeoutError extends ReaderError { +export interface ReaderResult { + code: ReaderResultCode; + error?: Error; + returnValue?: T; } -/** - * Error that is thrown on failures (i.e. unhandled exceptions during forceFlush or shutdown) - */ -export class ReaderFailureError extends ReaderError { +export enum ReaderResultCode { + SUCCESS, + FAILED, + TIMED_OUT } // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader /** - * A registered reader of metrics that, when linked to a {@link MetricProducer} offers, global + * A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global * control over metrics. */ export abstract class MetricReader { @@ -112,109 +90,143 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - async collect(): Promise { + collect(timeoutMillis: number, done?: (result: ReaderResult) => void): void { if (this._metricProducer === undefined) { - throw new ReaderFailureError({ message: 'MetricReader is not bound to a MetricProducer' }); + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: new Error('MetricReader is not bound to a MetricProducer'), + }); + } + return; } - // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. - if (this._shutdown) { - throw new ReaderFailureError({ message: 'Collection is not allowed after shutdown' }); + if(done) { + // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. + if (this._shutdown) { + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: new Error('Collection is not allowed after shutdown'), + }); + } + return; + } } - return await this._metricProducer.collect(); + this._metricProducer.collect().then( + result => { + if(done){ + done({ + code: ReaderResultCode.SUCCESS, + returnValue: result + }); + } + }, + reason => { + if(done){ + done({ + code: ReaderResultCode.FAILED, + error: reason + }) + } + } + ) } /** - * Calls an async function with a timeout. Will reject if the async function passed to this does not complete + * Adds a timeout to a promise. Will reject if the async function passed to this does not complete * before the timeout is reached. * @param promise promise to use with timeout. * @param timeout timeout in milliseconds until the returned promise is rejected. + * @param done * @protected */ - protected callWithTimeout(promise: Promise, timeout: number): Promise { + protected static promiseWithTimeout(promise: Promise, timeout: number, done?: (result: ReaderResult) => void): void { let timeoutHandle: ReturnType; - const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { + const timeoutPromise = new Promise>(function timeoutFunction(resolve) { timeoutHandle = setTimeout( function timeoutHandler() { - // TODO: This does not produce an adequate stacktrace. - reject(new ReaderTimeoutError({ message: 'Operation timed out.' })) + resolve({ + code: ReaderResultCode.TIMED_OUT, + error: new Error('Operation timed out.') + }) }, timeout ); }); - return Promise.race([promise, timeoutPromise]).then(result => { + const resultCodePromise = promise.then(result => { + return { code: ReaderResultCode.SUCCESS, returnValue: result } + }) + + Promise.race([resultCodePromise, timeoutPromise]).then(result => { + // Clear timeout on success and return result. clearTimeout(timeoutHandle); - return result; + if (done) { + done(result); + } }, reason => { + // Clear timeout on rejection and return failure. clearTimeout(timeoutHandle); - throw reason; - }); - } - - - private async _tryShutdown() { - try { - await this.onShutdown(); - } catch (err) { - // Re-throw timeout errors. - if (err instanceof ReaderTimeoutError) { - throw err; - } - - throw new ReaderFailureError({ - message: 'Unexpected error during shutdown.', - error: err + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: reason + }); + } }); - - } } /** * Shuts down the metric reader * @param shutdownTimeout timeout for shutdown (default: 10000ms) + * @param done */ // TODO: function will continue. - async shutdown(shutdownTimeout = 10000): Promise { + shutdown(shutdownTimeout = 10000, done?: (result: ReaderResult) => void): void { // Do not call shutdown again if it has already been called. if (this._shutdown) { - throw new ReaderFailureError({ message: 'Cannot call shutdown twice.' }) + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: new Error('Cannot call shutdown twice.') + }); + return; + } } - await this.callWithTimeout(this._tryShutdown(), shutdownTimeout); + MetricReader.promiseWithTimeout(this.onShutdown(), shutdownTimeout, (result => { + if (result.code === ReaderResultCode.SUCCESS) { + this._shutdown = true; + } - // TODO: (spec) can i not call shutdown twice even on shutdown failure? - this._shutdown = true; - } + if (done) { + done(result); + } + }) + ); - private async _tryForceFlush() { - try { - await this.onForceFlush(); - } catch (err) { - // Re-throw timeout errors. - if (err instanceof ReaderTimeoutError) { - throw err; - } - - throw new ReaderFailureError({ - message: 'Unexpected error during forceFlush.', - error: err - }); - } } /** * Flushes metrics read by this reader. * @param forceFlushTimeout timeout for force-flush (default: 10000 ms) + * @param done */ - async forceFlush(forceFlushTimeout = 10000): Promise { + forceFlush(forceFlushTimeout = 10000, done ?: (result: ReaderResult) => void): void { if (this._shutdown) { - throw new ReaderFailureError({ message: 'Cannot forceFlush on already shutdown MetricReader' }) + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: new Error('Cannot forceFlush on already shutdown MetricReader') + }); + return; + } } - await this.callWithTimeout(this._tryForceFlush(), forceFlushTimeout); + MetricReader.promiseWithTimeout(this.onForceFlush(), forceFlushTimeout, done); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 3043037f7dd..a15024fd264 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -15,8 +15,9 @@ */ import * as api from '@opentelemetry/api'; -import { MetricReader } from './MetricReader'; +import { MetricReader, ReaderResult, ReaderResultCode } from './MetricReader'; import { MetricExporter } from './MetricExporter'; +import { MetricData } from './MetricData'; export type PeriodicExportingMetricReaderOptions = { exporter: MetricExporter @@ -60,17 +61,35 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _runOnce(): Promise { - const metrics = await this.collect(); - await this._exporter.export(metrics); + const collectionResult = await new Promise>(resolve => { + this.collect(100, (result => { + resolve(result); + })) + }); + + if (collectionResult.code !== ReaderResultCode.SUCCESS) { + throw collectionResult.error ?? new Error('Unknown error occurred during collection.'); + } + + if (collectionResult.returnValue == null) { + throw new Error('Unknown error occurred during collection.'); + } + + await this._exporter.export(collectionResult.returnValue); } protected onInitialized(): void { this._interval = setInterval(async () => { - try { - await this.callWithTimeout(this._runOnce(), this._exportTimeout); - } catch (err) { - api.diag.error('Unexpected error during export: %s', err) - } + MetricReader.promiseWithTimeout(this._runOnce(), this._exportTimeout, (result => { + if (result.code === ReaderResultCode.TIMED_OUT) { + api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout); + return; + } + if (result.code === ReaderResultCode.FAILED) { + api.diag.error('Unexpected error during export: %s', result.error); + return; + } + })); }, this._exportInterval); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 32970b96394..7906ceaf4a0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -16,7 +16,7 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { MetricExporter, ReaderFailureError, ReaderTimeoutError } from '../../src'; +import { MetricExporter, ReaderResultCode } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; @@ -196,7 +196,7 @@ describe('PeriodicExportingMetricReader', () => { sinon.restore(); }); - it('should force export', async () => { + it('should force export', done => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); exporterMock.expects('export').calledOnceWithExactly([]); @@ -207,13 +207,15 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.forceFlush(); - - sinon.verify(); - await reader.shutdown(); + reader.forceFlush(30, _result => { + exporterMock.verify(); + reader.shutdown(20, _result => { + done(); + }); + }); }); - it('should throw ReaderTimeoutError when export takes too long', async () => { + it('should return TIMED_OUT when export takes too long', done => { const exporter = new TestMetricExporter(); exporter.exportTime = 1000; @@ -224,13 +226,18 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await assert.rejects(reader.forceFlush(20)); - - exporter.exportTime = 0; - await reader.shutdown(20); + reader.forceFlush(20, (result => { + assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); + + // cleanup. + exporter.exportTime = 0; + reader.shutdown(20, _result => { + done(); + }) + })); }).timeout(1000); - it('should throw when handler throws', async () => { + it('should return FAILED when handler throws', done => { const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ @@ -239,15 +246,18 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.forceFlush(), - thrown => thrown instanceof ReaderFailureError, - 'Unexpected error during shutdown.'); + reader.forceFlush(20, (result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); - exporter.throwException = false; - await exporter.shutdown(); + // cleanup. + exporter.throwException = false; + reader.shutdown(100, _result => { + done(); + }) + })); }); - it('should throw after shutdown', async () => { + it('should return FAILED after shutdown', done => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -257,15 +267,18 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown(); - await assert.rejects(reader.forceFlush(), - thrown => thrown instanceof ReaderFailureError, - ); + reader.shutdown(20, _result => { + reader.forceFlush(20, (result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + })); + }); + }); }); describe('shutdown', () => { - it('should export one last time', async () => { + it('should export one last time', done => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); exporterMock.expects('export').calledOnceWithExactly([]); @@ -276,12 +289,13 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown(); - - exporterMock.verify(); + reader.shutdown(20, () => { + exporterMock.verify(); + done() + }); }); - it('should throw ReaderTimeoutError when export takes too long', async () => { + it('should return TIMED_OUT when export takes too long', done => { const exporter = new TestMetricExporter(); exporter.exportTime = 1000; @@ -292,12 +306,14 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await assert.rejects(reader.shutdown(20), - thrown => thrown instanceof ReaderTimeoutError, + reader.shutdown(20, result => { + assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); + done(); + }, ); }).timeout(1000); - it('should throw when called twice', async () => { + it('should return FAILED when called twice', done => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -306,13 +322,23 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown(); - await assert.rejects(reader.shutdown(), - thrown => thrown instanceof ReaderFailureError, + + // first call should succeed. + reader.shutdown(20, result => { + assert.strictEqual(result.code, ReaderResultCode.SUCCESS); + + // second call should fail. + reader.shutdown(20, result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + }, + ); + }, ); + }); - it('should throw when handler throws', async () => { + it('should return FAILED when shutdown-handler throws.', done => { const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ @@ -321,14 +347,16 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.shutdown(), - thrown => thrown instanceof ReaderFailureError, - 'Unexpected error during shutdown.'); + reader.shutdown(20, result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + }, + ); }); }); describe('collect', () => { - it('should throw on non-initialized instance', async () => { + it('should return FAILED on non-initialized instance', done => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -336,9 +364,10 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.collect(), - thrown => thrown instanceof ReaderFailureError, - 'MetricReader is not bound to a MetricProducer'); + reader.collect(20, (result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + })); }); }) }); From fd04ff40b3562d25d47e6c7c6808571ce2216ac6 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 22 Dec 2021 12:46:49 +0100 Subject: [PATCH 03/22] refactor(metric-reader): add options so that timeout can be omitted. --- .../src/export/MetricReader.ts | 226 +++++++++--------- .../export/PeriodicExportingMetricReader.ts | 17 +- .../src/export/ReaderResult.ts | 28 +++ .../src/index.ts | 1 + .../PeriodicExportingMetricReader.test.ts | 142 +++++++---- 5 files changed, 234 insertions(+), 180 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 24d1d2a9fcb..d2b844e97a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -17,18 +17,69 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { MetricData } from './MetricData'; +import { ReaderResult, ReaderResultCode } from './ReaderResult'; - -export interface ReaderResult { - code: ReaderResultCode; - error?: Error; - returnValue?: T; +export type ReaderOptions = { + done?: (result: ReaderResult) => void + timeoutMillis?: number } -export enum ReaderResultCode { - SUCCESS, - FAILED, - TIMED_OUT +export type ReaderCollectionOptions = ReaderOptions; + +export type ReaderShutdownOptions = ReaderOptions; + +export type ReaderForceFlushOptions = ReaderOptions; + + +/** + * Adds a timeout to a promise and executes the callback if the specified timeout has elapsed, or the promise + * has resolved or rejected. + * + *

NOTE: this operation will continue even after the timeout fires the callback. + * + * @param promise promise to use with timeout. + * @param timeout the timeout in milliseconds until the returned promise is rejected. + * @param done the callback once the promise has resolved or rejected. + */ +export function promiseWithTimeout(promise: Promise, timeout: number, done: (result: ReaderResult) => void): void { + // keep handle so that we can clear it later. + let timeoutHandle: ReturnType; + + // Set up a promise to handle the timeout. + const timeoutPromise = new Promise>(function timeoutFunction(resolve) { + timeoutHandle = setTimeout( + function timeoutHandler() { + resolve({ + code: ReaderResultCode.TIMED_OUT, + error: new Error('Operation timed out.') + }) + }, + timeout + ); + }); + + // Wrap to promise to get a result code with the result if it does not reject. + const resultCodePromise = promise.then(result => { + return { code: ReaderResultCode.SUCCESS, returnValue: result } + }) + + Promise.race([resultCodePromise, timeoutPromise]).then(result => { + // Clear timeout on success and return result. + clearTimeout(timeoutHandle); + if (done) { + done(result); + } + }, + reason => { + // Clear timeout on rejection and return failure. + clearTimeout(timeoutHandle); + if (done) { + done({ + code: ReaderResultCode.FAILED, + error: reason + }); + } + }); } // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader @@ -90,143 +141,80 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - collect(timeoutMillis: number, done?: (result: ReaderResult) => void): void { + collect(options: ReaderCollectionOptions): void { + const timeout = options.timeoutMillis ?? 10000; + const done = options.done ?? (_result => { + }); + if (this._metricProducer === undefined) { - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('MetricReader is not bound to a MetricProducer'), - }); - } + done({ + code: ReaderResultCode.FAILED, + error: new Error('MetricReader is not bound to a MetricProducer'), + }); return; } - if(done) { - // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. - if (this._shutdown) { - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Collection is not allowed after shutdown'), - }); - } - return; - } + // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. + if (this._shutdown) { + done({ + code: ReaderResultCode.FAILED, + error: new Error('Collection is not allowed after shutdown'), + }); + return; } - this._metricProducer.collect().then( - result => { - if(done){ - done({ - code: ReaderResultCode.SUCCESS, - returnValue: result - }); - } - }, - reason => { - if(done){ - done({ - code: ReaderResultCode.FAILED, - error: reason - }) - } - } - ) + promiseWithTimeout(this._metricProducer.collect(), timeout, done); } /** - * Adds a timeout to a promise. Will reject if the async function passed to this does not complete - * before the timeout is reached. - * @param promise promise to use with timeout. - * @param timeout timeout in milliseconds until the returned promise is rejected. - * @param done - * @protected + * Shuts down the metric reader, the callback will fire after the specified timeout or after completion. + * + *

NOTE: this operation will continue even after the timeout fires the callback. + * @param options */ - protected static promiseWithTimeout(promise: Promise, timeout: number, done?: (result: ReaderResult) => void): void { - let timeoutHandle: ReturnType; - - const timeoutPromise = new Promise>(function timeoutFunction(resolve) { - timeoutHandle = setTimeout( - function timeoutHandler() { - resolve({ - code: ReaderResultCode.TIMED_OUT, - error: new Error('Operation timed out.') - }) - }, - timeout - ); + shutdown(options: ReaderForceFlushOptions): void { + const timeout = options.timeoutMillis ?? 10000; + const done = options.done ?? (_result => { }); - const resultCodePromise = promise.then(result => { - return { code: ReaderResultCode.SUCCESS, returnValue: result } - }) - - Promise.race([resultCodePromise, timeoutPromise]).then(result => { - // Clear timeout on success and return result. - clearTimeout(timeoutHandle); - if (done) { - done(result); - } - }, - reason => { - // Clear timeout on rejection and return failure. - clearTimeout(timeoutHandle); - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: reason - }); - } - }); - } - - /** - * Shuts down the metric reader - * @param shutdownTimeout timeout for shutdown (default: 10000ms) - * @param done - */ - // TODO: function will continue. - shutdown(shutdownTimeout = 10000, done?: (result: ReaderResult) => void): void { // Do not call shutdown again if it has already been called. if (this._shutdown) { - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Cannot call shutdown twice.') - }); - return; - } + done({ + code: ReaderResultCode.FAILED, + error: new Error('Cannot call shutdown twice.') + }); + return; } - MetricReader.promiseWithTimeout(this.onShutdown(), shutdownTimeout, (result => { + promiseWithTimeout(this.onShutdown(), timeout, result => { if (result.code === ReaderResultCode.SUCCESS) { this._shutdown = true; } - - if (done) { - done(result); - } - }) + done(result); + } ); } /** - * Flushes metrics read by this reader. - * @param forceFlushTimeout timeout for force-flush (default: 10000 ms) - * @param done + * Flushes metrics read by this reader, the callback will fire after the specified timeout or after completion. + * + *

NOTE: this operation will continue even after the timeout fires the callback. + * @param options options with timeout (default: 10000ms) and a result callback. */ - forceFlush(forceFlushTimeout = 10000, done ?: (result: ReaderResult) => void): void { + forceFlush(options: ReaderShutdownOptions): void { + const timeout = options.timeoutMillis ?? 10000; + const done = options.done ?? (_result => { + }); + if (this._shutdown) { - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Cannot forceFlush on already shutdown MetricReader') - }); - return; - } + done({ + code: ReaderResultCode.FAILED, + error: new Error('Cannot forceFlush on already shutdown MetricReader') + }); + return; } - MetricReader.promiseWithTimeout(this.onForceFlush(), forceFlushTimeout, done); + promiseWithTimeout(this.onForceFlush(), timeout, done); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index a15024fd264..a9cc66c59d0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -15,9 +15,10 @@ */ import * as api from '@opentelemetry/api'; -import { MetricReader, ReaderResult, ReaderResultCode } from './MetricReader'; +import { MetricReader, promiseWithTimeout } from './MetricReader'; import { MetricExporter } from './MetricExporter'; import { MetricData } from './MetricData'; +import { ReaderResult, ReaderResultCode } from './ReaderResult'; export type PeriodicExportingMetricReaderOptions = { exporter: MetricExporter @@ -61,16 +62,15 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _runOnce(): Promise { - const collectionResult = await new Promise>(resolve => { - this.collect(100, (result => { - resolve(result); - })) - }); + const collectionResult = await new Promise>(resolve => + this.collect({ done: resolve })); + // Throw on any code that does not indicate success. if (collectionResult.code !== ReaderResultCode.SUCCESS) { throw collectionResult.error ?? new Error('Unknown error occurred during collection.'); } + // This case should never occur. if (collectionResult.returnValue == null) { throw new Error('Unknown error occurred during collection.'); } @@ -79,8 +79,9 @@ export class PeriodicExportingMetricReader extends MetricReader { } protected onInitialized(): void { + // start running the interval as soon as this reader is initialized and keep handle for shutdown. this._interval = setInterval(async () => { - MetricReader.promiseWithTimeout(this._runOnce(), this._exportTimeout, (result => { + promiseWithTimeout(this._runOnce(), this._exportTimeout, (result => { if (result.code === ReaderResultCode.TIMED_OUT) { api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout); return; @@ -102,7 +103,7 @@ export class PeriodicExportingMetricReader extends MetricReader { clearInterval(this._interval); } - await this._runOnce(); + await this.onForceFlush(); await this._exporter.shutdown(); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts new file mode 100644 index 00000000000..20517a30575 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface ReaderResult { + code: ReaderResultCode; + error?: Error; + returnValue?: T; +} + +export enum ReaderResultCode { + SUCCESS, + FAILED, + TIMED_OUT +} + diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts index 35622792a2f..1f75248f0e4 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts @@ -17,3 +17,4 @@ export { MeterProvider, MeterProviderOptions } from './MeterProvider'; export * from './export/MetricExporter'; export * from './export/MetricReader'; +export { ReaderResult } from "./export/ReaderResult"; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 7906ceaf4a0..5c54c3a72d8 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -16,11 +16,12 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { MetricExporter, ReaderResultCode } from '../../src'; +import { MetricExporter } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { MetricProducer } from '../../src/export/MetricProducer'; +import { ReaderResultCode } from '../../src/export/ReaderResult'; const MAX_32_BIT_INT = 2 ** 31 - 1 @@ -167,7 +168,7 @@ describe('PeriodicExportingMetricReader', () => { const result = await exporter.waitForNumberOfExports(2); assert.deepEqual(result, [[], []]); - await reader.shutdown(); + await new Promise(resolve => reader.shutdown({ done: resolve })); }).timeout(1000); }); @@ -187,7 +188,7 @@ describe('PeriodicExportingMetricReader', () => { assert.deepEqual(result, [[], []]); exporter.throwException = false; - await reader.shutdown(); + await new Promise(resolve => reader.shutdown({ done: resolve })); }); }); @@ -207,11 +208,11 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.forceFlush(30, _result => { - exporterMock.verify(); - reader.shutdown(20, _result => { - done(); - }); + reader.forceFlush({ + done: _result => { + exporterMock.verify(); + reader.shutdown({ done: _result => done() }); + } }); }); @@ -226,15 +227,16 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.forceFlush(20, (result => { - assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); + reader.forceFlush({ + timeoutMillis: 20, + done: result => { + assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); - // cleanup. - exporter.exportTime = 0; - reader.shutdown(20, _result => { - done(); - }) - })); + // cleanup. + exporter.exportTime = 0; + reader.shutdown({ done: _result => done() }); + } + }); }).timeout(1000); it('should return FAILED when handler throws', done => { @@ -246,15 +248,15 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.forceFlush(20, (result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); + reader.forceFlush({ + done: result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); - // cleanup. - exporter.throwException = false; - reader.shutdown(100, _result => { - done(); - }) - })); + // cleanup. + exporter.throwException = false; + reader.shutdown({ done: _result => done() }); + } + }) }); it('should return FAILED after shutdown', done => { @@ -266,14 +268,17 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - - reader.shutdown(20, _result => { - reader.forceFlush(20, (result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); - })); + reader.shutdown({ + done: result => { + assert.strictEqual(result.code, ReaderResultCode.SUCCESS); + reader.forceFlush({ + done: forceFlushResult => { + assert.strictEqual(forceFlushResult.code, ReaderResultCode.FAILED); + done(); + } + }) + } }); - }); }); @@ -289,9 +294,11 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown(20, () => { - exporterMock.verify(); - done() + reader.shutdown({ + done: _result => { + exporterMock.verify(); + done(); + } }); }); @@ -306,11 +313,12 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown(20, result => { + reader.shutdown({ + timeoutMillis: 20, done: result => { assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); done(); - }, - ); + } + }); }).timeout(1000); it('should return FAILED when called twice', done => { @@ -324,18 +332,19 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); // first call should succeed. - reader.shutdown(20, result => { + reader.shutdown({ + done: result => { assert.strictEqual(result.code, ReaderResultCode.SUCCESS); - // second call should fail. - reader.shutdown(20, result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); + //second call should fail + reader.shutdown({ + done: secondResult => { + assert.strictEqual(secondResult.code, ReaderResultCode.FAILED); done(); - }, - ); - }, - ); - + } + }) + } + }); }); it('should return FAILED when shutdown-handler throws.', done => { @@ -347,9 +356,11 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.shutdown(20, result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); + reader.shutdown({ + done: result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + } }, ); }); @@ -364,10 +375,35 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.collect(20, (result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); - })); + reader.collect({ + done: (result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + }) + }); }); + + it('should return FAILED on shut-down instance', done => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer(new TestMetricProducer()); + + reader.shutdown({ + done: _result => { + reader.collect({ + done: (result => { + assert.strictEqual(result.code, ReaderResultCode.FAILED); + done(); + }) + }); + } + }) + + }) }) }); From df2af4a0acf47a1b918caa3040599fbd11d08fe2 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 22 Dec 2021 13:45:16 +0100 Subject: [PATCH 04/22] refactor(metric-reader): combine TestMetricExporter with WaitingMetricExporter --- .../PeriodicExportingMetricReader.test.ts | 56 +++++-------------- 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 5c54c3a72d8..187666e9745 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -26,41 +26,9 @@ import { ReaderResultCode } from '../../src/export/ReaderResult'; const MAX_32_BIT_INT = 2 ** 31 - 1 class TestMetricExporter extends MetricExporter { - metricDataList: MetricData[] = [] public exportTime = 0; public throwException = false; - - async export(batch: MetricData[]): Promise { - this.metricDataList.push(...batch); - if (this.throwException) { - throw new Error('Error during export'); - } - await new Promise(resolve => setTimeout(resolve, this.exportTime)); - } - - async forceFlush(): Promise { - } - - getPreferredAggregationTemporality(): AggregationTemporality { - return AggregationTemporality.CUMULATIVE; - } -} - -class TestDeltaMetricExporter extends TestMetricExporter { - override getPreferredAggregationTemporality(): AggregationTemporality { - return AggregationTemporality.DELTA; - } -} - -class WaitingMetricExporter extends MetricExporter { - private _batches: MetricData[][]; - public throwException: boolean; - - constructor() { - super(); - this._batches = []; - this.throwException = false; - } + private _batches: MetricData[][] = []; async export(batch: MetricData[]): Promise { this._batches.push([]); @@ -68,6 +36,7 @@ class WaitingMetricExporter extends MetricExporter { if (this.throwException) { throw new Error('Error during export'); } + await new Promise(resolve => setTimeout(resolve, this.exportTime)); } async forceFlush(): Promise { @@ -89,6 +58,12 @@ class WaitingMetricExporter extends MetricExporter { } } +class TestDeltaMetricExporter extends TestMetricExporter { + override getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.DELTA; + } +} + class TestMetricProducer implements MetricProducer { async collect(): Promise { return []; @@ -157,7 +132,7 @@ describe('PeriodicExportingMetricReader', () => { describe('setMetricProducer', () => { it('should start exporting periodically', async () => { - const exporter = new WaitingMetricExporter(); + const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: 50, @@ -169,12 +144,12 @@ describe('PeriodicExportingMetricReader', () => { assert.deepEqual(result, [[], []]); await new Promise(resolve => reader.shutdown({ done: resolve })); - }).timeout(1000); + }); }); describe('periodic export', () => { it('should keep running on export errors', async () => { - const exporter = new WaitingMetricExporter(); + const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -237,7 +212,7 @@ describe('PeriodicExportingMetricReader', () => { reader.shutdown({ done: _result => done() }); } }); - }).timeout(1000); + }); it('should return FAILED when handler throws', done => { const exporter = new TestMetricExporter(); @@ -319,7 +294,7 @@ describe('PeriodicExportingMetricReader', () => { done(); } }); - }).timeout(1000); + }); it('should return FAILED when called twice', done => { const exporter = new TestMetricExporter(); @@ -402,8 +377,7 @@ describe('PeriodicExportingMetricReader', () => { }) }); } - }) - - }) + }); + }); }) }); From 8390e4a1f95756ae8cda371d33acf304b9428fa3 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 22 Dec 2021 16:12:28 +0100 Subject: [PATCH 05/22] fix(metric-reader): update shutdown and forceFlush usages, fix style. --- .../packages/opentelemetry-sdk-metrics-base/src/index.ts | 2 +- .../src/state/MetricCollector.ts | 4 ++-- .../test/state/MetricCollector.test.ts | 6 +----- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts index 1f75248f0e4..82903c18e21 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts @@ -17,4 +17,4 @@ export { MeterProvider, MeterProviderOptions } from './MeterProvider'; export * from './export/MetricExporter'; export * from './export/MetricReader'; -export { ReaderResult } from "./export/ReaderResult"; +export * from './export/ReaderResult'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index c1bea5e3694..4e082c45ca6 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer { * Delegates for MetricReader.forceFlush. */ async forceFlush(): Promise { - return this._metricReader.forceFlush(); + await new Promise(resolve => this._metricReader.forceFlush({done: resolve})); } /** * Delegates for MetricReader.shutdown. */ async shutdown(): Promise { - return this._metricReader.shutdown(); + await new Promise(resolve => this._metricReader.shutdown({done: resolve})); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index f984a8a8f08..cb88346985a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -45,11 +45,7 @@ class TestDeltaMetricExporter extends TestMetricExporter { } class TestMetricReader extends MetricReader { - getMetricCollector(): MetricCollector { - return this['_metricProducer'] as MetricCollector; - } - - protected handleCollectedMetrics(metrics: MetricData[]): Promise { + protected handleCollectedMetrics(_metrics: MetricData[]): Promise { return Promise.resolve(undefined); } From 78d582de75d2a21c3ac4b3a6760e5baabd8a90bf Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Thu, 23 Dec 2021 09:58:32 +0100 Subject: [PATCH 06/22] refactor(metric-reader): add default implementation for onInitialized(), add TestMetricReader --- .../src/export/MetricReader.ts | 7 +++-- .../export/PeriodicExportingMetricReader.ts | 2 +- .../test/export/TestMetricReader.ts | 30 +++++++++++++++++++ .../test/state/MetricCollector.test.ts | 19 +----------- 4 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index d2b844e97a3..032a3285dcf 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -116,10 +116,11 @@ export abstract class MetricReader { /** * Handle once the SDK has initialized this {@link MetricReader} - * - * @protected + * Overriding this method is optional. */ - protected abstract onInitialized(): void; + protected onInitialized(): void { + // Default implementation is empty. + } /** * Handle a shutdown signal by the SDK. diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index a9cc66c59d0..a622168a8a0 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -78,7 +78,7 @@ export class PeriodicExportingMetricReader extends MetricReader { await this._exporter.export(collectionResult.returnValue); } - protected onInitialized(): void { + protected override onInitialized(): void { // start running the interval as soon as this reader is initialized and keep handle for shutdown. this._interval = setInterval(async () => { promiseWithTimeout(this._runOnce(), this._exportTimeout, (result => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts new file mode 100644 index 00000000000..02c076e6752 --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricReader.ts @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { MetricReader } from '../../src'; + +/** + * A test metric reader that implements no-op onForceFlush() and onShutdown() handlers. + */ +export class TestMetricReader extends MetricReader { + protected onForceFlush(): Promise { + return Promise.resolve(undefined); + } + + protected onShutdown(): Promise { + return Promise.resolve(undefined); + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index cb88346985a..ce94c080cbc 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -19,11 +19,11 @@ import * as sinon from 'sinon'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { MetricData, PointDataType } from '../../src/export/MetricData'; import { MetricExporter } from '../../src/export/MetricExporter'; -import { MetricReader } from '../../src/export/MetricReader'; import { Meter } from '../../src/Meter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util'; +import { TestMetricReader } from '../export/TestMetricReader'; class TestMetricExporter extends MetricExporter { metricDataList: MetricData[] = [] @@ -44,23 +44,6 @@ class TestDeltaMetricExporter extends TestMetricExporter { } } -class TestMetricReader extends MetricReader { - protected handleCollectedMetrics(_metrics: MetricData[]): Promise { - return Promise.resolve(undefined); - } - - protected onForceFlush(): Promise { - return Promise.resolve(undefined); - } - - protected onShutdown(): Promise { - return Promise.resolve(undefined); - } - - protected onInitialized(): void { - } -} - describe('MetricCollector', () => { afterEach(() => { sinon.restore(); From 381c8a0c971e211761deae90bc721ae4a5e1c441 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Tue, 4 Jan 2022 16:25:53 +0100 Subject: [PATCH 07/22] refactor(metric-reader): use promise pattern instead of callbacks --- .../src/export/MetricReader.ts | 125 +++++---------- .../export/PeriodicExportingMetricReader.ts | 34 ++-- .../PeriodicExportingMetricReader.test.ts | 149 ++++++------------ 3 files changed, 93 insertions(+), 215 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 032a3285dcf..c541b6ebe87 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -17,68 +17,55 @@ import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { MetricData } from './MetricData'; -import { ReaderResult, ReaderResultCode } from './ReaderResult'; -export type ReaderOptions = { - done?: (result: ReaderResult) => void +export type ReaderOptions = { timeoutMillis?: number } -export type ReaderCollectionOptions = ReaderOptions; +export type ReaderCollectionOptions = ReaderOptions; -export type ReaderShutdownOptions = ReaderOptions; +export type ReaderShutdownOptions = ReaderOptions; -export type ReaderForceFlushOptions = ReaderOptions; +export type ReaderForceFlushOptions = ReaderOptions; +/** + * Error that is thrown on timeouts (i.e. timeout on forceFlush or shutdown) + */ +export class ReaderTimeoutError extends Error { + constructor(message?: string) { + super(message); + Object.setPrototypeOf(this, ReaderTimeoutError.prototype); + } +} /** - * Adds a timeout to a promise and executes the callback if the specified timeout has elapsed, or the promise - * has resolved or rejected. + * Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise + * rejects, and resolves if the specified promise resolves. * - *

NOTE: this operation will continue even after the timeout fires the callback. + *

NOTE: this operation will continue even after it throws a {@link ReaderTimeoutError}. * * @param promise promise to use with timeout. * @param timeout the timeout in milliseconds until the returned promise is rejected. - * @param done the callback once the promise has resolved or rejected. */ -export function promiseWithTimeout(promise: Promise, timeout: number, done: (result: ReaderResult) => void): void { - // keep handle so that we can clear it later. +export function callWithTimeout(promise: Promise, timeout: number): Promise { let timeoutHandle: ReturnType; - // Set up a promise to handle the timeout. - const timeoutPromise = new Promise>(function timeoutFunction(resolve) { + const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { timeoutHandle = setTimeout( function timeoutHandler() { - resolve({ - code: ReaderResultCode.TIMED_OUT, - error: new Error('Operation timed out.') - }) + reject(new ReaderTimeoutError('Operation timed out.')); }, timeout ); }); - // Wrap to promise to get a result code with the result if it does not reject. - const resultCodePromise = promise.then(result => { - return { code: ReaderResultCode.SUCCESS, returnValue: result } - }) - - Promise.race([resultCodePromise, timeoutPromise]).then(result => { - // Clear timeout on success and return result. + return Promise.race([promise, timeoutPromise]).then(result => { clearTimeout(timeoutHandle); - if (done) { - done(result); - } + return result; }, reason => { - // Clear timeout on rejection and return failure. clearTimeout(timeoutHandle); - if (done) { - done({ - code: ReaderResultCode.FAILED, - error: reason - }); - } + throw reason; }); } @@ -142,80 +129,46 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - collect(options: ReaderCollectionOptions): void { - const timeout = options.timeoutMillis ?? 10000; - const done = options.done ?? (_result => { - }); - + async collect(options: ReaderCollectionOptions): Promise { if (this._metricProducer === undefined) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('MetricReader is not bound to a MetricProducer'), - }); - return; + throw new Error('MetricReader is not bound to a MetricProducer'); } // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. if (this._shutdown) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Collection is not allowed after shutdown'), - }); - return; + throw new Error('Collection is not allowed after shutdown'); } - promiseWithTimeout(this._metricProducer.collect(), timeout, done); + return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis ?? 10000); } /** - * Shuts down the metric reader, the callback will fire after the specified timeout or after completion. + * Shuts down the metric reader, the promise will reject after the specified timeout or resolve after completion. * - *

NOTE: this operation will continue even after the timeout fires the callback. - * @param options + *

NOTE: this operation will continue even after the promise rejects due to a timeout. + * @param options options with timeout (default: 10000ms). */ - shutdown(options: ReaderForceFlushOptions): void { - const timeout = options.timeoutMillis ?? 10000; - const done = options.done ?? (_result => { - }); - + async shutdown(options: ReaderForceFlushOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Cannot call shutdown twice.') - }); - return; + throw new Error('Cannot call shutdown twice.'); } - promiseWithTimeout(this.onShutdown(), timeout, result => { - if (result.code === ReaderResultCode.SUCCESS) { - this._shutdown = true; - } - done(result); - } - ); - + await callWithTimeout(this.onShutdown(), options.timeoutMillis ?? 10000); + this._shutdown = true; } /** - * Flushes metrics read by this reader, the callback will fire after the specified timeout or after completion. + * Flushes metrics read by this reader, the promise will reject after the specified timeout or resolve after completion. * - *

NOTE: this operation will continue even after the timeout fires the callback. - * @param options options with timeout (default: 10000ms) and a result callback. + *

NOTE: this operation will continue even after the promise rejects due to a timeout. + * @param options options with timeout (default: 10000ms). */ - forceFlush(options: ReaderShutdownOptions): void { - const timeout = options.timeoutMillis ?? 10000; - const done = options.done ?? (_result => { - }); - + async forceFlush(options: ReaderShutdownOptions): Promise { if (this._shutdown) { - done({ - code: ReaderResultCode.FAILED, - error: new Error('Cannot forceFlush on already shutdown MetricReader') - }); - return; + throw new Error('Cannot forceFlush on already shutdown MetricReader.'); } - promiseWithTimeout(this.onForceFlush(), timeout, done); + await callWithTimeout(this.onForceFlush(), options.timeoutMillis ?? 10000); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index a622168a8a0..fa0ed3aead7 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -15,10 +15,8 @@ */ import * as api from '@opentelemetry/api'; -import { MetricReader, promiseWithTimeout } from './MetricReader'; +import { callWithTimeout, MetricReader, ReaderTimeoutError } from './MetricReader'; import { MetricExporter } from './MetricExporter'; -import { MetricData } from './MetricData'; -import { ReaderResult, ReaderResultCode } from './ReaderResult'; export type PeriodicExportingMetricReaderOptions = { exporter: MetricExporter @@ -62,35 +60,23 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _runOnce(): Promise { - const collectionResult = await new Promise>(resolve => - this.collect({ done: resolve })); - - // Throw on any code that does not indicate success. - if (collectionResult.code !== ReaderResultCode.SUCCESS) { - throw collectionResult.error ?? new Error('Unknown error occurred during collection.'); - } - - // This case should never occur. - if (collectionResult.returnValue == null) { - throw new Error('Unknown error occurred during collection.'); - } - - await this._exporter.export(collectionResult.returnValue); + const metrics = await this.collect({}); + await this._exporter.export(metrics); } protected override onInitialized(): void { // start running the interval as soon as this reader is initialized and keep handle for shutdown. this._interval = setInterval(async () => { - promiseWithTimeout(this._runOnce(), this._exportTimeout, (result => { - if (result.code === ReaderResultCode.TIMED_OUT) { + try { + await callWithTimeout(this._runOnce(), this._exportTimeout); + } catch (err) { + if (err instanceof ReaderTimeoutError) { api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout); return; } - if (result.code === ReaderResultCode.FAILED) { - api.diag.error('Unexpected error during export: %s', result.error); - return; - } - })); + + api.diag.error('Unexpected error during export: %s', err); + } }, this._exportInterval); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 187666e9745..6902d887f30 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -16,12 +16,11 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { MetricExporter } from '../../src'; +import { MetricExporter, ReaderTimeoutError } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { MetricProducer } from '../../src/export/MetricProducer'; -import { ReaderResultCode } from '../../src/export/ReaderResult'; const MAX_32_BIT_INT = 2 ** 31 - 1 @@ -124,7 +123,7 @@ describe('PeriodicExportingMetricReader', () => { exportIntervalMillis: 1, exportTimeoutMillis: 1 }); - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise(resolve => setTimeout(resolve, 50)); exporterMock.verify(); }) @@ -135,7 +134,7 @@ describe('PeriodicExportingMetricReader', () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, - exportIntervalMillis: 50, + exportIntervalMillis: 30, exportTimeoutMillis: 20 }); @@ -143,7 +142,7 @@ describe('PeriodicExportingMetricReader', () => { const result = await exporter.waitForNumberOfExports(2); assert.deepEqual(result, [[], []]); - await new Promise(resolve => reader.shutdown({ done: resolve })); + await reader.shutdown({}); }); }); @@ -153,7 +152,7 @@ describe('PeriodicExportingMetricReader', () => { exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ exporter: exporter, - exportIntervalMillis: 50, + exportIntervalMillis: 30, exportTimeoutMillis: 20 }); @@ -163,7 +162,7 @@ describe('PeriodicExportingMetricReader', () => { assert.deepEqual(result, [[], []]); exporter.throwException = false; - await new Promise(resolve => reader.shutdown({ done: resolve })); + await reader.shutdown({}); }); }); @@ -172,7 +171,7 @@ describe('PeriodicExportingMetricReader', () => { sinon.restore(); }); - it('should force export', done => { + it('should force export', async () => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); exporterMock.expects('export').calledOnceWithExactly([]); @@ -183,17 +182,14 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.forceFlush({ - done: _result => { - exporterMock.verify(); - reader.shutdown({ done: _result => done() }); - } - }); + await reader.forceFlush({}); + exporterMock.verify(); + await reader.shutdown({}); }); - it('should return TIMED_OUT when export takes too long', done => { + it('should throw ReaderTimeoutError when export takes too long', async () => { const exporter = new TestMetricExporter(); - exporter.exportTime = 1000; + exporter.exportTime = 60; const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -202,19 +198,12 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.forceFlush({ - timeoutMillis: 20, - done: result => { - assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); - - // cleanup. - exporter.exportTime = 0; - reader.shutdown({ done: _result => done() }); - } - }); + await assert.rejects(reader.forceFlush({ timeoutMillis: 20 }), + thrown => thrown instanceof ReaderTimeoutError); + await reader.shutdown({}); }); - it('should return FAILED when handler throws', done => { + it('should throw when handler throws', async () => { const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ @@ -223,18 +212,10 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.forceFlush({ - done: result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - - // cleanup. - exporter.throwException = false; - reader.shutdown({ done: _result => done() }); - } - }) + await assert.rejects(reader.forceFlush({})); }); - it('should return FAILED after shutdown', done => { + it('should throw after shutdown', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -243,25 +224,20 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown({ - done: result => { - assert.strictEqual(result.code, ReaderResultCode.SUCCESS); - reader.forceFlush({ - done: forceFlushResult => { - assert.strictEqual(forceFlushResult.code, ReaderResultCode.FAILED); - done(); - } - }) - } - }); + await reader.shutdown({}); + await assert.rejects(reader.shutdown({})); }); }); describe('shutdown', () => { - it('should export one last time', done => { + afterEach(() => { + sinon.restore(); + }); + + it('should forceFlush', async () => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); - exporterMock.expects('export').calledOnceWithExactly([]); + exporterMock.expects('forceFlush').calledOnceWithExactly([]); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, @@ -269,15 +245,11 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown({ - done: _result => { - exporterMock.verify(); - done(); - } - }); + await reader.shutdown({}); + exporterMock.verify(); }); - it('should return TIMED_OUT when export takes too long', done => { + it('should throw ReaderTimeoutError when export takes too long', async () => { const exporter = new TestMetricExporter(); exporter.exportTime = 1000; @@ -288,15 +260,11 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown({ - timeoutMillis: 20, done: result => { - assert.strictEqual(result.code, ReaderResultCode.TIMED_OUT); - done(); - } - }); + await assert.rejects(reader.shutdown({ timeoutMillis: 20 }), + thrown => thrown instanceof ReaderTimeoutError); }); - it('should return FAILED when called twice', done => { + it('should throw when called twice', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -307,22 +275,11 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); // first call should succeed. - reader.shutdown({ - done: result => { - assert.strictEqual(result.code, ReaderResultCode.SUCCESS); - - //second call should fail - reader.shutdown({ - done: secondResult => { - assert.strictEqual(secondResult.code, ReaderResultCode.FAILED); - done(); - } - }) - } - }); + await reader.shutdown({}); + await assert.rejects(reader.shutdown({})); }); - it('should return FAILED when shutdown-handler throws.', done => { + it('should throw on non-initialized instance.', async () => { const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ @@ -331,18 +288,13 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.shutdown({ - done: result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); - } - }, - ); + await assert.rejects(reader.shutdown({})); }); - }); + }) + ; describe('collect', () => { - it('should return FAILED on non-initialized instance', done => { + it('should throw on non-initialized instance', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -350,15 +302,10 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - reader.collect({ - done: (result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); - }) - }); + await assert.rejects(reader.collect({})); }); - it('should return FAILED on shut-down instance', done => { + it('should throw on shut-down instance', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -368,16 +315,8 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); - reader.shutdown({ - done: _result => { - reader.collect({ - done: (result => { - assert.strictEqual(result.code, ReaderResultCode.FAILED); - done(); - }) - }); - } - }); + await reader.shutdown({}); + await assert.rejects(reader.collect({})); }); - }) + }); }); From f9ec116e5dd502e5d1501a276adbccc2521a0e51 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Tue, 4 Jan 2022 16:51:06 +0100 Subject: [PATCH 08/22] refactor(metric-reader): update metric collector to use the promise pattern. --- .../src/state/MetricCollector.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 4e082c45ca6..b230fd54d6f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer { * Delegates for MetricReader.forceFlush. */ async forceFlush(): Promise { - await new Promise(resolve => this._metricReader.forceFlush({done: resolve})); + await this._metricReader.forceFlush({}); } /** * Delegates for MetricReader.shutdown. */ async shutdown(): Promise { - await new Promise(resolve => this._metricReader.shutdown({done: resolve})); + await this._metricReader.shutdown({}); } } From fcfe6b7b649c78fb7cd9995ad8ab2219b24612ac Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Tue, 4 Jan 2022 17:09:17 +0100 Subject: [PATCH 09/22] fix(metric-reader): pass function instead of Promise to assert.rejects to keep node 8 compatibility. --- .../export/PeriodicExportingMetricReader.test.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 6902d887f30..2384465c44b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -198,7 +198,7 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await assert.rejects(reader.forceFlush({ timeoutMillis: 20 }), + await assert.rejects(() => reader.forceFlush({ timeoutMillis: 20 }), thrown => thrown instanceof ReaderTimeoutError); await reader.shutdown({}); }); @@ -212,7 +212,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.forceFlush({})); + await assert.rejects(() => reader.forceFlush({})); }); it('should throw after shutdown', async () => { @@ -225,7 +225,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown({}); - await assert.rejects(reader.shutdown({})); + await assert.rejects(() => reader.shutdown({})); }); }); @@ -260,7 +260,7 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await assert.rejects(reader.shutdown({ timeoutMillis: 20 }), + await assert.rejects(() => reader.shutdown({ timeoutMillis: 20 }), thrown => thrown instanceof ReaderTimeoutError); }); @@ -276,7 +276,7 @@ describe('PeriodicExportingMetricReader', () => { // first call should succeed. await reader.shutdown({}); - await assert.rejects(reader.shutdown({})); + await assert.rejects(() => reader.shutdown({})); }); it('should throw on non-initialized instance.', async () => { @@ -288,7 +288,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.shutdown({})); + await assert.rejects(() => reader.shutdown({})); }); }) ; @@ -302,7 +302,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(reader.collect({})); + await assert.rejects(() => reader.collect({})); }); it('should throw on shut-down instance', async () => { @@ -316,7 +316,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown({}); - await assert.rejects(reader.collect({})); + await assert.rejects(() => reader.collect({})); }); }); }); From 6d2690b0d57138f0ac25018346cf7ea1ee7d1eda Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Tue, 4 Jan 2022 18:05:04 +0100 Subject: [PATCH 10/22] fix(metric-reader): do not collect and export before force-flushing the exporter. --- .../export/PeriodicExportingMetricReader.ts | 3 +-- .../PeriodicExportingMetricReader.test.ts | 22 ++++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index fa0ed3aead7..7bb0945ae3c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -81,7 +81,7 @@ export class PeriodicExportingMetricReader extends MetricReader { } protected async onForceFlush(): Promise { - await this._runOnce(); + await this._exporter.forceFlush(); } protected async onShutdown(): Promise { @@ -89,7 +89,6 @@ export class PeriodicExportingMetricReader extends MetricReader { clearInterval(this._interval); } - await this.onForceFlush(); await this._exporter.shutdown(); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 2384465c44b..e12a08bbed4 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -26,6 +26,7 @@ const MAX_32_BIT_INT = 2 ** 31 - 1 class TestMetricExporter extends MetricExporter { public exportTime = 0; + public forceFlushTime = 0; public throwException = false; private _batches: MetricData[][] = []; @@ -39,6 +40,11 @@ class TestMetricExporter extends MetricExporter { } async forceFlush(): Promise { + if (this.throwException) { + throw new Error('Error during forceFlush'); + } + + await new Promise(resolve => setTimeout(resolve, this.forceFlushTime)); } async waitForNumberOfExports(numberOfExports: number): Promise { @@ -171,10 +177,10 @@ describe('PeriodicExportingMetricReader', () => { sinon.restore(); }); - it('should force export', async () => { + it('should forceFlush exporter', async () => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); - exporterMock.expects('export').calledOnceWithExactly([]); + exporterMock.expects('forceFlush').calledOnceWithExactly(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, @@ -187,9 +193,9 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown({}); }); - it('should throw ReaderTimeoutError when export takes too long', async () => { + it('should throw ReaderTimeoutError when forceFlush takes too long', async () => { const exporter = new TestMetricExporter(); - exporter.exportTime = 60; + exporter.forceFlushTime = 60; const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -203,7 +209,7 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown({}); }); - it('should throw when handler throws', async () => { + it('should throw when exporter throws', async () => { const exporter = new TestMetricExporter(); exporter.throwException = true; const reader = new PeriodicExportingMetricReader({ @@ -237,7 +243,7 @@ describe('PeriodicExportingMetricReader', () => { it('should forceFlush', async () => { const exporter = new TestMetricExporter(); const exporterMock = sinon.mock(exporter); - exporterMock.expects('forceFlush').calledOnceWithExactly([]); + exporterMock.expects('forceFlush').calledOnceWithExactly(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, @@ -249,9 +255,9 @@ describe('PeriodicExportingMetricReader', () => { exporterMock.verify(); }); - it('should throw ReaderTimeoutError when export takes too long', async () => { + it('should throw ReaderTimeoutError when forceFlush takes too long', async () => { const exporter = new TestMetricExporter(); - exporter.exportTime = 1000; + exporter.forceFlushTime = 1000; const reader = new PeriodicExportingMetricReader({ exporter: exporter, From 2346295a2fec665e997c82ecf1d861b293824019 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 10:59:28 +0100 Subject: [PATCH 11/22] refactor(metric-reader): remove unused ReaderResult --- .../src/export/ReaderResult.ts | 28 ------------------- 1 file changed, 28 deletions(-) delete mode 100644 experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts deleted file mode 100644 index 20517a30575..00000000000 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/ReaderResult.ts +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -export interface ReaderResult { - code: ReaderResultCode; - error?: Error; - returnValue?: T; -} - -export enum ReaderResultCode { - SUCCESS, - FAILED, - TIMED_OUT -} - From 59697cfd2de52c30f2a51c2cbdd0ffa643cef9cf Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 11:05:52 +0100 Subject: [PATCH 12/22] fix(metric-reader): do not throw on multiple shutdown calls. --- .../src/export/MetricReader.ts | 4 +++- .../opentelemetry-sdk-metrics-base/src/index.ts | 1 - .../export/PeriodicExportingMetricReader.test.ts | 12 ++++++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index c541b6ebe87..6b50547956b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import * as api from '@opentelemetry/api'; import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { MetricData } from './MetricData'; @@ -151,7 +152,8 @@ export abstract class MetricReader { async shutdown(options: ReaderForceFlushOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { - throw new Error('Cannot call shutdown twice.'); + api.diag.error('Cannot call shutdown twice.'); + return; } await callWithTimeout(this.onShutdown(), options.timeoutMillis ?? 10000); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts index 82903c18e21..35622792a2f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/index.ts @@ -17,4 +17,3 @@ export { MeterProvider, MeterProviderOptions } from './MeterProvider'; export * from './export/MetricExporter'; export * from './export/MetricReader'; -export * from './export/ReaderResult'; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index e12a08bbed4..8fff1793052 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -231,7 +231,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown({}); - await assert.rejects(() => reader.shutdown({})); + await assert.rejects(() => reader.forceFlush({})); }); }); @@ -270,8 +270,10 @@ describe('PeriodicExportingMetricReader', () => { thrown => thrown instanceof ReaderTimeoutError); }); - it('should throw when called twice', async () => { + it('called twice should call export shutdown only once', async () => { const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + exporterMock.expects('shutdown').calledOnceWithExactly(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, @@ -280,9 +282,11 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); - // first call should succeed. + // call twice, the exporter's shutdown must only be called once. await reader.shutdown({}); - await assert.rejects(() => reader.shutdown({})); + await reader.shutdown({}); + + exporterMock.verify(); }); it('should throw on non-initialized instance.', async () => { From 5fa2f3984a71c8c7651d51b88f772d70072b720c Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 11:34:59 +0100 Subject: [PATCH 13/22] refactor(metric-reader): move callWithTimeout and TimeoutError to src/utils.ts --- .../src/export/MetricReader.ts | 42 +------------------ .../export/PeriodicExportingMetricReader.ts | 5 ++- .../src/utils.ts | 41 ++++++++++++++++++ .../PeriodicExportingMetricReader.test.ts | 7 ++-- 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 6b50547956b..3bc13c8564b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -18,6 +18,7 @@ import * as api from '@opentelemetry/api'; import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { MetricData } from './MetricData'; +import { callWithTimeout } from '../utils'; export type ReaderOptions = { timeoutMillis?: number @@ -29,47 +30,6 @@ export type ReaderShutdownOptions = ReaderOptions; export type ReaderForceFlushOptions = ReaderOptions; -/** - * Error that is thrown on timeouts (i.e. timeout on forceFlush or shutdown) - */ -export class ReaderTimeoutError extends Error { - constructor(message?: string) { - super(message); - Object.setPrototypeOf(this, ReaderTimeoutError.prototype); - } -} - -/** - * Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise - * rejects, and resolves if the specified promise resolves. - * - *

NOTE: this operation will continue even after it throws a {@link ReaderTimeoutError}. - * - * @param promise promise to use with timeout. - * @param timeout the timeout in milliseconds until the returned promise is rejected. - */ -export function callWithTimeout(promise: Promise, timeout: number): Promise { - let timeoutHandle: ReturnType; - - const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { - timeoutHandle = setTimeout( - function timeoutHandler() { - reject(new ReaderTimeoutError('Operation timed out.')); - }, - timeout - ); - }); - - return Promise.race([promise, timeoutPromise]).then(result => { - clearTimeout(timeoutHandle); - return result; - }, - reason => { - clearTimeout(timeoutHandle); - throw reason; - }); -} - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader /** diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts index 7bb0945ae3c..f7e54530765 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/PeriodicExportingMetricReader.ts @@ -15,8 +15,9 @@ */ import * as api from '@opentelemetry/api'; -import { callWithTimeout, MetricReader, ReaderTimeoutError } from './MetricReader'; +import { MetricReader } from './MetricReader'; import { MetricExporter } from './MetricExporter'; +import { callWithTimeout, TimeoutError } from '../utils'; export type PeriodicExportingMetricReaderOptions = { exporter: MetricExporter @@ -70,7 +71,7 @@ export class PeriodicExportingMetricReader extends MetricReader { try { await callWithTimeout(this._runOnce(), this._exportTimeout); } catch (err) { - if (err instanceof ReaderTimeoutError) { + if (err instanceof TimeoutError) { api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout); return; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 7f269755f20..4240ddee6c2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -38,3 +38,44 @@ export function hashAttributes(attributes: Attributes): string { return (result += key + ':' + attributes[key]); }, '|#'); } + +/** + * Error that is thrown on timeouts. + */ +export class TimeoutError extends Error { + constructor(message?: string) { + super(message); + Object.setPrototypeOf(this, TimeoutError.prototype); + } +} + +/** + * Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise + * rejects, and resolves if the specified promise resolves. + * + *

NOTE: this operation will continue even after it throws a {@link TimeoutError}. + * + * @param promise promise to use with timeout. + * @param timeout the timeout in milliseconds until the returned promise is rejected. + */ +export function callWithTimeout(promise: Promise, timeout: number): Promise { + let timeoutHandle: ReturnType; + + const timeoutPromise = new Promise(function timeoutFunction(_resolve, reject) { + timeoutHandle = setTimeout( + function timeoutHandler() { + reject(new TimeoutError('Operation timed out.')); + }, + timeout + ); + }); + + return Promise.race([promise, timeoutPromise]).then(result => { + clearTimeout(timeoutHandle); + return result; + }, + reason => { + clearTimeout(timeoutHandle); + throw reason; + }); +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 8fff1793052..fc54324f1a2 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -16,11 +16,12 @@ import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { MetricExporter, ReaderTimeoutError } from '../../src'; +import { MetricExporter } from '../../src'; import { MetricData } from '../../src/export/MetricData'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { MetricProducer } from '../../src/export/MetricProducer'; +import { TimeoutError } from '../../src/utils'; const MAX_32_BIT_INT = 2 ** 31 - 1 @@ -205,7 +206,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await assert.rejects(() => reader.forceFlush({ timeoutMillis: 20 }), - thrown => thrown instanceof ReaderTimeoutError); + thrown => thrown instanceof TimeoutError); await reader.shutdown({}); }); @@ -267,7 +268,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await assert.rejects(() => reader.shutdown({ timeoutMillis: 20 }), - thrown => thrown instanceof ReaderTimeoutError); + thrown => thrown instanceof TimeoutError); }); it('called twice should call export shutdown only once', async () => { From 5081c9c8cb5042e0386834cb77f259b1ccc84ca3 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 12:50:36 +0100 Subject: [PATCH 14/22] docs(metric-reader): add link to describe why the prototype is set manually in TimeoutError --- .../packages/opentelemetry-sdk-metrics-base/src/utils.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts index 4240ddee6c2..5ae4e0ee102 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts @@ -45,6 +45,9 @@ export function hashAttributes(attributes: Attributes): string { export class TimeoutError extends Error { constructor(message?: string) { super(message); + + // manually adjust prototype to retain `instanceof` functionality when targeting ES5, see: + // https://github.com/Microsoft/TypeScript-wiki/blob/main/Breaking-Changes.md#extending-built-ins-like-error-array-and-map-may-no-longer-work Object.setPrototypeOf(this, TimeoutError.prototype); } } From 315842251caf50ebb43415a1af9e946dd44f06f6 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 16:36:56 +0100 Subject: [PATCH 15/22] fix(metric-reader): fix switched-out reader and force-flush options. --- .../opentelemetry-sdk-metrics-base/src/export/MetricReader.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 3bc13c8564b..b5bfa5f29bb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -109,7 +109,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout (default: 10000ms). */ - async shutdown(options: ReaderForceFlushOptions): Promise { + async shutdown(options: ReaderShutdownOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { api.diag.error('Cannot call shutdown twice.'); @@ -126,7 +126,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout (default: 10000ms). */ - async forceFlush(options: ReaderShutdownOptions): Promise { + async forceFlush(options: ReaderForceFlushOptions): Promise { if (this._shutdown) { throw new Error('Cannot forceFlush on already shutdown MetricReader.'); } From 282a75d5bf2bfcce7c4daff0876e03aade8ddc78 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Wed, 5 Jan 2022 17:02:54 +0100 Subject: [PATCH 16/22] fix(metric-reader): do not use default timeouts. --- .../src/export/MetricReader.ts | 31 ++++++++++++++----- .../PeriodicExportingMetricReader.test.ts | 22 +++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index b5bfa5f29bb..7695fe07d6d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -100,14 +100,19 @@ export abstract class MetricReader { throw new Error('Collection is not allowed after shutdown'); } - return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis ?? 10000); + // No timeout if timeoutMillis is undefined or null. + if (options.timeoutMillis == null) { + return await this._metricProducer.collect(); + } + + return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis); } /** - * Shuts down the metric reader, the promise will reject after the specified timeout or resolve after completion. + * Shuts down the metric reader, the promise will reject after the optional timeout or resolve after completion. * *

NOTE: this operation will continue even after the promise rejects due to a timeout. - * @param options options with timeout (default: 10000ms). + * @param options options with timeout. */ async shutdown(options: ReaderShutdownOptions): Promise { // Do not call shutdown again if it has already been called. @@ -116,21 +121,33 @@ export abstract class MetricReader { return; } - await callWithTimeout(this.onShutdown(), options.timeoutMillis ?? 10000); + // No timeout if timeoutMillis is undefined or null. + if (options.timeoutMillis == null) { + await this.onShutdown(); + } else { + await callWithTimeout(this.onShutdown(), options.timeoutMillis); + } + this._shutdown = true; } /** - * Flushes metrics read by this reader, the promise will reject after the specified timeout or resolve after completion. + * Flushes metrics read by this reader, the promise will reject after the optional timeout or resolve after completion. * *

NOTE: this operation will continue even after the promise rejects due to a timeout. - * @param options options with timeout (default: 10000ms). + * @param options options with timeout. */ async forceFlush(options: ReaderForceFlushOptions): Promise { if (this._shutdown) { throw new Error('Cannot forceFlush on already shutdown MetricReader.'); } - await callWithTimeout(this.onForceFlush(), options.timeoutMillis ?? 10000); + // No timeout if timeoutMillis is undefined or null. + if (options.timeoutMillis == null) { + await this.onForceFlush(); + return; + } + + await callWithTimeout(this.onForceFlush(), options.timeoutMillis); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index fc54324f1a2..b1bf7f8d111 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -71,7 +71,10 @@ class TestDeltaMetricExporter extends TestMetricExporter { } class TestMetricProducer implements MetricProducer { + public collectionTime = 0; + async collect(): Promise { + await new Promise(resolve => setTimeout(resolve, this.collectionTime)); return []; } } @@ -329,5 +332,24 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown({}); await assert.rejects(() => reader.collect({})); }); + + it('should time out when timeoutMillis is set', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + const producer = new TestMetricProducer(); + producer.collectionTime = 40; + reader.setMetricProducer(producer); + + await assert.rejects( + () => reader.collect({ timeoutMillis: 20 }), + thrown => thrown instanceof TimeoutError + ); + + await reader.shutdown({}); + }); }); }); From 9a4b24b35797e75f0a79d2dd8af3515cf762822d Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 10:34:19 +0100 Subject: [PATCH 17/22] fix(metric-reader): make options argument optional, cleanup. --- .../src/export/MetricReader.ts | 12 +++--- .../PeriodicExportingMetricReader.test.ts | 42 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 7695fe07d6d..b25df51c36c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -90,7 +90,7 @@ export abstract class MetricReader { /** * Collect all metrics from the associated {@link MetricProducer} */ - async collect(options: ReaderCollectionOptions): Promise { + async collect(options?: ReaderCollectionOptions): Promise { if (this._metricProducer === undefined) { throw new Error('MetricReader is not bound to a MetricProducer'); } @@ -101,7 +101,7 @@ export abstract class MetricReader { } // No timeout if timeoutMillis is undefined or null. - if (options.timeoutMillis == null) { + if (options?.timeoutMillis == null) { return await this._metricProducer.collect(); } @@ -114,7 +114,7 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async shutdown(options: ReaderShutdownOptions): Promise { + async shutdown(options?: ReaderShutdownOptions): Promise { // Do not call shutdown again if it has already been called. if (this._shutdown) { api.diag.error('Cannot call shutdown twice.'); @@ -122,7 +122,7 @@ export abstract class MetricReader { } // No timeout if timeoutMillis is undefined or null. - if (options.timeoutMillis == null) { + if (options?.timeoutMillis == null) { await this.onShutdown(); } else { await callWithTimeout(this.onShutdown(), options.timeoutMillis); @@ -137,13 +137,13 @@ export abstract class MetricReader { *

NOTE: this operation will continue even after the promise rejects due to a timeout. * @param options options with timeout. */ - async forceFlush(options: ReaderForceFlushOptions): Promise { + async forceFlush(options?: ReaderForceFlushOptions): Promise { if (this._shutdown) { throw new Error('Cannot forceFlush on already shutdown MetricReader.'); } // No timeout if timeoutMillis is undefined or null. - if (options.timeoutMillis == null) { + if (options?.timeoutMillis == null) { await this.onForceFlush(); return; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index b1bf7f8d111..a29431ab0f8 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -152,7 +152,7 @@ describe('PeriodicExportingMetricReader', () => { const result = await exporter.waitForNumberOfExports(2); assert.deepEqual(result, [[], []]); - await reader.shutdown({}); + await reader.shutdown(); }); }); @@ -172,7 +172,7 @@ describe('PeriodicExportingMetricReader', () => { assert.deepEqual(result, [[], []]); exporter.throwException = false; - await reader.shutdown({}); + await reader.shutdown(); }); }); @@ -192,12 +192,12 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.forceFlush({}); + await reader.forceFlush(); exporterMock.verify(); - await reader.shutdown({}); + await reader.shutdown(); }); - it('should throw ReaderTimeoutError when forceFlush takes too long', async () => { + it('should throw TimeoutError when forceFlush takes too long', async () => { const exporter = new TestMetricExporter(); exporter.forceFlushTime = 60; @@ -209,8 +209,8 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await assert.rejects(() => reader.forceFlush({ timeoutMillis: 20 }), - thrown => thrown instanceof TimeoutError); - await reader.shutdown({}); + TimeoutError); + await reader.shutdown(); }); it('should throw when exporter throws', async () => { @@ -222,7 +222,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(() => reader.forceFlush({})); + await assert.rejects(() => reader.forceFlush()); }); it('should throw after shutdown', async () => { @@ -234,8 +234,8 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown({}); - await assert.rejects(() => reader.forceFlush({})); + await reader.shutdown(); + await assert.rejects(() => reader.forceFlush()); }); }); @@ -255,11 +255,11 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown({}); + await reader.shutdown(); exporterMock.verify(); }); - it('should throw ReaderTimeoutError when forceFlush takes too long', async () => { + it('should throw TimeoutError when forceFlush takes too long', async () => { const exporter = new TestMetricExporter(); exporter.forceFlushTime = 1000; @@ -271,7 +271,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await assert.rejects(() => reader.shutdown({ timeoutMillis: 20 }), - thrown => thrown instanceof TimeoutError); + TimeoutError); }); it('called twice should call export shutdown only once', async () => { @@ -287,8 +287,8 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); // call twice, the exporter's shutdown must only be called once. - await reader.shutdown({}); - await reader.shutdown({}); + await reader.shutdown(); + await reader.shutdown(); exporterMock.verify(); }); @@ -302,7 +302,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(() => reader.shutdown({})); + await assert.rejects(() => reader.shutdown()); }); }) ; @@ -316,7 +316,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 80, }); - await assert.rejects(() => reader.collect({})); + await assert.rejects(() => reader.collect()); }); it('should throw on shut-down instance', async () => { @@ -329,8 +329,8 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); - await reader.shutdown({}); - await assert.rejects(() => reader.collect({})); + await reader.shutdown(); + await assert.rejects(() => reader.collect()); }); it('should time out when timeoutMillis is set', async () => { @@ -346,10 +346,10 @@ describe('PeriodicExportingMetricReader', () => { await assert.rejects( () => reader.collect({ timeoutMillis: 20 }), - thrown => thrown instanceof TimeoutError + TimeoutError ); - await reader.shutdown({}); + await reader.shutdown(); }); }); }); From 0ff2c92f07c8f56b759761a050ddf7de52defa82 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 10:37:02 +0100 Subject: [PATCH 18/22] fix(metric-reader): actually add batch to _batches in TestMetricExporter --- .../test/export/PeriodicExportingMetricReader.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index a29431ab0f8..39348669926 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -32,7 +32,7 @@ class TestMetricExporter extends MetricExporter { private _batches: MetricData[][] = []; async export(batch: MetricData[]): Promise { - this._batches.push([]); + this._batches.push(batch); if (this.throwException) { throw new Error('Error during export'); From 471d5d5fa0214b5bf13741d60b485810c808a894 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 15:16:36 +0100 Subject: [PATCH 19/22] fix(metric-reader): add test-case for timed-out export. --- .../PeriodicExportingMetricReader.test.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index 39348669926..c6477ccde52 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -174,6 +174,25 @@ describe('PeriodicExportingMetricReader', () => { exporter.throwException = false; await reader.shutdown(); }); + + it('should keep exporting on export timeouts', async () => { + const exporter = new TestMetricExporter(); + // set time longer than timeout. + exporter.exportTime = 40; + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 30, + exportTimeoutMillis: 20 + }); + + reader.setMetricProducer(new TestMetricProducer()); + + const result = await exporter.waitForNumberOfExports(2); + assert.deepEqual(result, [[], []]); + + exporter.throwException = false; + await reader.shutdown(); + }); }); describe('forceFlush', () => { From a3b6796807422ba19668f2578ac2b66432fd1fb3 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 16:31:32 +0100 Subject: [PATCH 20/22] docs(metric-reader): add TODO comment for BindOncePromise. --- .../opentelemetry-sdk-metrics-base/src/export/MetricReader.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index b25df51c36c..7e023850e6f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -38,6 +38,7 @@ export type ReaderForceFlushOptions = ReaderOptions; */ export abstract class MetricReader { // Tracks the shutdown state. + // TODO: use BindOncePromise here once a new version of @opentelemetry/core is available. private _shutdown = false; // MetricProducer used by this instance. private _metricProducer?: MetricProducer; From 2de4fc91bc744c88ed697098197289b58cc44095 Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 19:20:03 +0100 Subject: [PATCH 21/22] fix(metric-reader): do not throw on collect and forceFlush when instance is shut down --- .../src/export/MetricReader.ts | 6 ++++-- .../export/PeriodicExportingMetricReader.test.ts | 13 +++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts index 7e023850e6f..b67876c9b11 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/export/MetricReader.ts @@ -98,7 +98,8 @@ export abstract class MetricReader { // Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls. if (this._shutdown) { - throw new Error('Collection is not allowed after shutdown'); + api.diag.warn('Collection is not allowed after shutdown'); + return []; } // No timeout if timeoutMillis is undefined or null. @@ -140,7 +141,8 @@ export abstract class MetricReader { */ async forceFlush(options?: ReaderForceFlushOptions): Promise { if (this._shutdown) { - throw new Error('Cannot forceFlush on already shutdown MetricReader.'); + api.diag.warn('Cannot forceFlush on already shutdown MetricReader.'); + return; } // No timeout if timeoutMillis is undefined or null. diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts index c6477ccde52..ca426e29081 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/PeriodicExportingMetricReader.test.ts @@ -244,8 +244,11 @@ describe('PeriodicExportingMetricReader', () => { await assert.rejects(() => reader.forceFlush()); }); - it('should throw after shutdown', async () => { + it('should not forceFlush exporter after shutdown', async () => { const exporter = new TestMetricExporter(); + const exporterMock = sinon.mock(exporter); + // expect once on shutdown. + exporterMock.expects('forceFlush').once(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, exportIntervalMillis: MAX_32_BIT_INT, @@ -254,7 +257,9 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown(); - await assert.rejects(() => reader.forceFlush()); + await reader.forceFlush(); + + exporterMock.verify(); }); }); @@ -338,7 +343,7 @@ describe('PeriodicExportingMetricReader', () => { await assert.rejects(() => reader.collect()); }); - it('should throw on shut-down instance', async () => { + it('should return empty on shut-down instance', async () => { const exporter = new TestMetricExporter(); const reader = new PeriodicExportingMetricReader({ exporter: exporter, @@ -349,7 +354,7 @@ describe('PeriodicExportingMetricReader', () => { reader.setMetricProducer(new TestMetricProducer()); await reader.shutdown(); - await assert.rejects(() => reader.collect()); + assert.deepEqual([], await reader.collect()); }); it('should time out when timeoutMillis is set', async () => { From 056ea1991d7311abf3b992e072b57de72711078c Mon Sep 17 00:00:00 2001 From: "marc.pichler" Date: Fri, 7 Jan 2022 19:39:33 +0100 Subject: [PATCH 22/22] refactor(metric-reader): remove empty options from calls in MetricCollector. --- .../src/state/MetricCollector.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index b230fd54d6f..3841354e4ba 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer { * Delegates for MetricReader.forceFlush. */ async forceFlush(): Promise { - await this._metricReader.forceFlush({}); + await this._metricReader.forceFlush(); } /** * Delegates for MetricReader.shutdown. */ async shutdown(): Promise { - await this._metricReader.shutdown({}); + await this._metricReader.shutdown(); } }