Skip to content

Commit

Permalink
feat(sdk-metrics-base): shutdown and forceflush on MeterProvider (#2890)
Browse files Browse the repository at this point in the history
Co-authored-by: Valentin Marchaud <contact@vmarchaud.fr>
  • Loading branch information
legendecas and vmarchaud authored Apr 15, 2022
1 parent 322dabe commit 7086d5a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 52 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented
* feat(prometheus): update prometheus exporter with wip metrics sdk #2824 @legendecas
* feat(instrumentation-xhr): add applyCustomAttributesOnSpan hook #2134 @mhennoch
* feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan
* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Aggregation } from './view/Aggregation';
import { FilteringAttributesProcessor } from './view/AttributesProcessor';
import { InstrumentType } from './InstrumentDescriptor';
import { PatternPredicate } from './view/Predicate';
import { ForceFlushOptions, ShutdownOptions } from './types';

/**
* MeterProviderOptions provides an interface for configuring a MeterProvider.
Expand Down Expand Up @@ -163,59 +164,36 @@ export class MeterProvider implements metrics.MeterProvider {
/**
* Flush all buffered data and shut down the MeterProvider and all registered
* MetricReaders.
* Returns a promise which is resolved when all flushes are complete.
*
* TODO: return errors to caller somehow?
* Returns a promise which is resolved when all flushes are complete.
*/
async shutdown(): Promise<void> {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#shutdown

async shutdown(options?: ShutdownOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('shutdown may only be called once per MeterProvider');
return;
}

// TODO add a timeout - spec leaves it up the the SDK if this is configurable
this._shutdown = true;

for (const collector of this._sharedState.metricCollectors) {
try {
await collector.shutdown();
} catch (e) {
// Log all Errors.
if (e instanceof Error) {
api.diag.error(`Error shutting down: ${e.message}`);
}
}
}
await Promise.all(this._sharedState.metricCollectors.map(collector => {
return collector.shutdown(options);
}));
}

/**
* Notifies all registered MetricReaders to flush any buffered data.
* Returns a promise which is resolved when all flushes are complete.
*
* TODO: return errors to caller somehow?
* Returns a promise which is resolved when all flushes are complete.
*/
async forceFlush(): Promise<void> {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#forceflush

// TODO add a timeout - spec leaves it up the the SDK if this is configurable

async forceFlush(options?: ForceFlushOptions): Promise<void> {
// do not flush after shutdown
if (this._shutdown) {
api.diag.warn('invalid attempt to force flush after shutdown');
api.diag.warn('invalid attempt to force flush after MeterProvider shutdown');
return;
}

for (const collector of this._sharedState.metricCollectors) {
try {
await collector.forceFlush();
} catch (e) {
// Log all Errors.
if (e instanceof Error) {
api.diag.error(`Error flushing: ${e.message}`);
}
}
}
await Promise.all(this._sharedState.metricCollectors.map(collector => {
return collector.forceFlush(options);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,7 @@ import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';


export type ReaderOptions = {
timeoutMillis?: number
};

export type ReaderCollectionOptions = ReaderOptions;

export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

Expand All @@ -53,6 +43,9 @@ export abstract class MetricReader {
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
if (this._metricProducer) {
throw new Error('MetricReader can not be bound to a MeterProvider again.');
}
this._metricProducer = metricProducer;
this.onInitialized();
}
Expand Down Expand Up @@ -92,7 +85,7 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<Maybe<ResourceMetrics>> {
async collect(options?: CollectionOptions): Promise<Maybe<ResourceMetrics>> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}
Expand All @@ -117,7 +110,7 @@ export abstract class MetricReader {
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
async shutdown(options?: ShutdownOptions): Promise<void> {
// Do not call shutdown again if it has already been called.
if (this._shutdown) {
api.diag.error('Cannot call shutdown twice.');
Expand All @@ -140,7 +133,7 @@ export abstract class MetricReader {
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
async forceFlush(options?: ForceFlushOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { AggregationTemporality } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { MeterProviderSharedState } from './MeterProviderSharedState';

/**
Expand Down Expand Up @@ -46,15 +47,15 @@ export class MetricCollector implements MetricProducer {
/**
* Delegates for MetricReader.forceFlush.
*/
async forceFlush(): Promise<void> {
await this._metricReader.forceFlush();
async forceFlush(options?: ForceFlushOptions): Promise<void> {
await this._metricReader.forceFlush(options);
}

/**
* Delegates for MetricReader.shutdown.
*/
async shutdown(): Promise<void> {
await this._metricReader.shutdown();
async shutdown(options?: ShutdownOptions): Promise<void> {
await this._metricReader.shutdown(options);
}
}

Expand Down
25 changes: 25 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export type CommonReaderOptions = {
timeoutMillis?: number
};

export type CollectionOptions = CommonReaderOptions;

export type ShutdownOptions = CommonReaderOptions;

export type ForceFlushOptions = CommonReaderOptions;
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import {
defaultResource
} from './util';
import { TestMetricReader } from './export/TestMetricReader';
import * as sinon from 'sinon';

describe('MeterProvider', () => {
afterEach(() => {
sinon.restore();
});

describe('constructor', () => {
it('should construct without exceptions', () => {
const meterProvider = new MeterProvider();
Expand Down Expand Up @@ -422,4 +427,53 @@ describe('MeterProvider', () => {
});
});
});

describe('shutdown', () => {
it('should shutdown all registered metric readers', async () => {
const meterProvider = new MeterProvider({ resource: defaultResource });
const reader1 = new TestMetricReader();
const reader2 = new TestMetricReader();
const reader1ShutdownSpy = sinon.spy(reader1, 'shutdown');
const reader2ShutdownSpy = sinon.spy(reader2, 'shutdown');

meterProvider.addMetricReader(reader1);
meterProvider.addMetricReader(reader2);

await meterProvider.shutdown({ timeoutMillis: 1234 });
await meterProvider.shutdown();
await meterProvider.shutdown();

assert.strictEqual(reader1ShutdownSpy.callCount, 1);
assert.deepStrictEqual(reader1ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
assert.strictEqual(reader2ShutdownSpy.callCount, 1);
assert.deepStrictEqual(reader2ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
});
});

describe('forceFlush', () => {
it('should forceFlush all registered metric readers', async () => {
const meterProvider = new MeterProvider({ resource: defaultResource });
const reader1 = new TestMetricReader();
const reader2 = new TestMetricReader();
const reader1ForceFlushSpy = sinon.spy(reader1, 'forceFlush');
const reader2ForceFlushSpy = sinon.spy(reader2, 'forceFlush');

meterProvider.addMetricReader(reader1);
meterProvider.addMetricReader(reader2);

await meterProvider.forceFlush({ timeoutMillis: 1234 });
await meterProvider.forceFlush({ timeoutMillis: 5678 });
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
assert.deepStrictEqual(reader1ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
assert.deepStrictEqual(reader1ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
assert.deepStrictEqual(reader2ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
assert.deepStrictEqual(reader2ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });

await meterProvider.shutdown();
await meterProvider.forceFlush();
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { MeterProvider } from '../../src/MeterProvider';
import { TestMetricReader } from './TestMetricReader';


describe('MetricReader', () => {
describe('setMetricProducer', () => {
it('The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance', () => {
const reader = new TestMetricReader();
const meterProvider1 = new MeterProvider();
const meterProvider2 = new MeterProvider();

meterProvider1.addMetricReader(reader);
assert.throws(() => meterProvider1.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
assert.throws(() => meterProvider2.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
});
});
});

0 comments on commit 7086d5a

Please sign in to comment.