Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890

Merged
merged 6 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented
* feat(prometheus): update prometheus exporter with wip metrics sdk #2824 @legendecas
* feat(instrumentation-xhr): add applyCustomAttributesOnSpan hook #2134 @mhennoch
* feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan
* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Aggregation } from './view/Aggregation';
import { FilteringAttributesProcessor } from './view/AttributesProcessor';
import { InstrumentType } from './InstrumentDescriptor';
import { PatternPredicate } from './view/Predicate';
import { ForceFlushOptions, ShutdownOptions } from './types';

/**
* MeterProviderOptions provides an interface for configuring a MeterProvider.
Expand Down Expand Up @@ -163,59 +164,36 @@ export class MeterProvider implements metrics.MeterProvider {
/**
* Flush all buffered data and shut down the MeterProvider and all registered
* MetricReaders.
* Returns a promise which is resolved when all flushes are complete.
*
* TODO: return errors to caller somehow?
* Returns a promise which is resolved when all flushes are complete.
*/
async shutdown(): Promise<void> {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#shutdown

async shutdown(options?: ShutdownOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('shutdown may only be called once per MeterProvider');
return;
}

// TODO add a timeout - spec leaves it up the the SDK if this is configurable
this._shutdown = true;

for (const collector of this._sharedState.metricCollectors) {
try {
await collector.shutdown();
} catch (e) {
// Log all Errors.
if (e instanceof Error) {
api.diag.error(`Error shutting down: ${e.message}`);
}
}
}
await Promise.all(this._sharedState.metricCollectors.map(collector => {
return collector.shutdown(options);
}));
}

/**
* Notifies all registered MetricReaders to flush any buffered data.
* Returns a promise which is resolved when all flushes are complete.
*
* TODO: return errors to caller somehow?
* Returns a promise which is resolved when all flushes are complete.
*/
async forceFlush(): Promise<void> {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#forceflush

// TODO add a timeout - spec leaves it up the the SDK if this is configurable

async forceFlush(options?: ForceFlushOptions): Promise<void> {
// do not flush after shutdown
if (this._shutdown) {
api.diag.warn('invalid attempt to force flush after shutdown');
api.diag.warn('invalid attempt to force flush after MeterProvider shutdown');
return;
}

for (const collector of this._sharedState.metricCollectors) {
try {
await collector.forceFlush();
} catch (e) {
// Log all Errors.
if (e instanceof Error) {
api.diag.error(`Error flushing: ${e.message}`);
}
}
}
await Promise.all(this._sharedState.metricCollectors.map(collector => {
return collector.forceFlush(options);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,7 @@ import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { ResourceMetrics } from './MetricData';
import { callWithTimeout, Maybe } from '../utils';


export type ReaderOptions = {
timeoutMillis?: number
};

export type ReaderCollectionOptions = ReaderOptions;

export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

Expand All @@ -53,6 +43,9 @@ export abstract class MetricReader {
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
if (this._metricProducer) {
throw new Error('MetricReader can not be bound to a MeterProvider again.');
}
this._metricProducer = metricProducer;
this.onInitialized();
}
Expand Down Expand Up @@ -92,7 +85,7 @@ export abstract class MetricReader {
/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<Maybe<ResourceMetrics>> {
async collect(options?: CollectionOptions): Promise<Maybe<ResourceMetrics>> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}
Expand All @@ -117,7 +110,7 @@ export abstract class MetricReader {
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
async shutdown(options?: ShutdownOptions): Promise<void> {
// Do not call shutdown again if it has already been called.
if (this._shutdown) {
api.diag.error('Cannot call shutdown twice.');
Expand All @@ -140,7 +133,7 @@ export abstract class MetricReader {
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
async forceFlush(options?: ForceFlushOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { AggregationTemporality } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { MeterProviderSharedState } from './MeterProviderSharedState';

/**
Expand Down Expand Up @@ -46,15 +47,15 @@ export class MetricCollector implements MetricProducer {
/**
* Delegates for MetricReader.forceFlush.
*/
async forceFlush(): Promise<void> {
await this._metricReader.forceFlush();
async forceFlush(options?: ForceFlushOptions): Promise<void> {
await this._metricReader.forceFlush(options);
}

/**
* Delegates for MetricReader.shutdown.
*/
async shutdown(): Promise<void> {
await this._metricReader.shutdown();
async shutdown(options?: ShutdownOptions): Promise<void> {
await this._metricReader.shutdown(options);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export type CommonReaderOptions = {
timeoutMillis?: number
};

export type CollectionOptions = CommonReaderOptions;

export type ShutdownOptions = CommonReaderOptions;

export type ForceFlushOptions = CommonReaderOptions;
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import {
defaultResource
} from './util';
import { TestMetricReader } from './export/TestMetricReader';
import * as sinon from 'sinon';

describe('MeterProvider', () => {
afterEach(() => {
sinon.restore();
});

describe('constructor', () => {
it('should construct without exceptions', () => {
const meterProvider = new MeterProvider();
Expand Down Expand Up @@ -422,4 +427,53 @@ describe('MeterProvider', () => {
});
});
});

describe('shutdown', () => {
it('should shutdown all registered metric readers', async () => {
legendecas marked this conversation as resolved.
Show resolved Hide resolved
const meterProvider = new MeterProvider({ resource: defaultResource });
const reader1 = new TestMetricReader();
const reader2 = new TestMetricReader();
const reader1ShutdownSpy = sinon.spy(reader1, 'shutdown');
const reader2ShutdownSpy = sinon.spy(reader2, 'shutdown');

meterProvider.addMetricReader(reader1);
meterProvider.addMetricReader(reader2);

await meterProvider.shutdown({ timeoutMillis: 1234 });
await meterProvider.shutdown();
await meterProvider.shutdown();

assert.strictEqual(reader1ShutdownSpy.callCount, 1);
assert.deepStrictEqual(reader1ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
assert.strictEqual(reader2ShutdownSpy.callCount, 1);
assert.deepStrictEqual(reader2ShutdownSpy.args[0][0], { timeoutMillis: 1234 });
});
});

describe('forceFlush', () => {
it('should forceFlush all registered metric readers', async () => {
const meterProvider = new MeterProvider({ resource: defaultResource });
const reader1 = new TestMetricReader();
const reader2 = new TestMetricReader();
const reader1ForceFlushSpy = sinon.spy(reader1, 'forceFlush');
const reader2ForceFlushSpy = sinon.spy(reader2, 'forceFlush');

meterProvider.addMetricReader(reader1);
meterProvider.addMetricReader(reader2);

await meterProvider.forceFlush({ timeoutMillis: 1234 });
await meterProvider.forceFlush({ timeoutMillis: 5678 });
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
assert.deepStrictEqual(reader1ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
assert.deepStrictEqual(reader1ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
assert.deepStrictEqual(reader2ForceFlushSpy.args[0][0], { timeoutMillis: 1234 });
assert.deepStrictEqual(reader2ForceFlushSpy.args[1][0], { timeoutMillis: 5678 });

await meterProvider.shutdown();
await meterProvider.forceFlush();
assert.strictEqual(reader1ForceFlushSpy.callCount, 2);
assert.strictEqual(reader2ForceFlushSpy.callCount, 2);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { MeterProvider } from '../../src/MeterProvider';
import { TestMetricReader } from './TestMetricReader';


describe('MetricReader', () => {
describe('setMetricProducer', () => {
it('The SDK MUST NOT allow a MetricReader instance to be registered on more than one MeterProvider instance', () => {
const reader = new TestMetricReader();
const meterProvider1 = new MeterProvider();
const meterProvider2 = new MeterProvider();

meterProvider1.addMetricReader(reader);
assert.throws(() => meterProvider1.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
assert.throws(() => meterProvider2.addMetricReader(reader), /MetricReader can not be bound to a MeterProvider again/);
});
});
});