Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric-reader): add metric-reader #2681

Merged
merged 22 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6c4f989
feat(metric-reader): add metric-reader.
pichlermarc Dec 20, 2021
4a767a7
refactor(metric-reader): use callbacks
pichlermarc Dec 21, 2021
fd04ff4
refactor(metric-reader): add options so that timeout can be omitted.
pichlermarc Dec 22, 2021
df2af4a
refactor(metric-reader): combine TestMetricExporter with WaitingMetri…
pichlermarc Dec 22, 2021
8390e4a
fix(metric-reader): update shutdown and forceFlush usages, fix style.
pichlermarc Dec 22, 2021
78d582d
refactor(metric-reader): add default implementation for onInitialized…
pichlermarc Dec 23, 2021
381c8a0
refactor(metric-reader): use promise pattern instead of callbacks
pichlermarc Jan 4, 2022
f9ec116
refactor(metric-reader): update metric collector to use the promise p…
pichlermarc Jan 4, 2022
fcfe6b7
fix(metric-reader): pass function instead of Promise to assert.reject…
pichlermarc Jan 4, 2022
6d2690b
fix(metric-reader): do not collect and export before force-flushing t…
pichlermarc Jan 4, 2022
2346295
refactor(metric-reader): remove unused ReaderResult
pichlermarc Jan 5, 2022
59697cf
fix(metric-reader): do not throw on multiple shutdown calls.
pichlermarc Jan 5, 2022
5fa2f39
refactor(metric-reader): move callWithTimeout and TimeoutError to src…
pichlermarc Jan 5, 2022
5081c9c
docs(metric-reader): add link to describe why the prototype is set ma…
pichlermarc Jan 5, 2022
3158422
fix(metric-reader): fix switched-out reader and force-flush options.
pichlermarc Jan 5, 2022
282a75d
fix(metric-reader): do not use default timeouts.
pichlermarc Jan 5, 2022
9a4b24b
fix(metric-reader): make options argument optional, cleanup.
pichlermarc Jan 7, 2022
0ff2c92
fix(metric-reader): actually add batch to _batches in TestMetricExporter
pichlermarc Jan 7, 2022
471d5d5
fix(metric-reader): add test-case for timed-out export.
pichlermarc Jan 7, 2022
a3b6796
docs(metric-reader): add TODO comment for BindOncePromise.
pichlermarc Jan 7, 2022
2de4fc9
fix(metric-reader): do not throw on collect and forceFlush when insta…
pichlermarc Jan 7, 2022
056ea19
refactor(metric-reader): remove empty options from calls in MetricCol…
pichlermarc Jan 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,143 @@
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricExporter } from './MetricExporter';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';

export type ReaderOptions = {
timeoutMillis?: number
}

export type ReaderCollectionOptions = ReaderOptions;

export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

/**
* A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global
* control over metrics.
*/
export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;

constructor(private _exporter: MetricExporter) {}
constructor(private readonly _preferredAggregationTemporality = AggregationTemporality.CUMULATIVE) {
}

/**
* Set the {@link MetricProducer} used by this instance.
*
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, is this taking the Java route of passing a metric-producer rather than hard-coding collect to talk to static registry of metrics.

I'm wondering if we should update the specification so that this is the "default" specified mechanism and the .NET hook is an alternative option to make it more clear to implementers that wish to a hard-link between metric-reader implementation + the sdk. Did you find this confusing at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing on the spec side. I've previously submitted issues like open-telemetry/opentelemetry-specification#2072 for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it is somewhat confusing on the spec side, when I was reading it the same question described in the issue above popped into my mind. I think updating the spec would help in that regard.

this._metricProducer = metricProducer;
this.onInitialized();
}

/**
* Get the {@link AggregationTemporality} preferred by this {@link MetricReader}
*/
getPreferredAggregationTemporality(): AggregationTemporality {
return this._exporter.getPreferredAggregationTemporality();
return this._preferredAggregationTemporality;
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
* Overriding this method is optional.
*/
protected onInitialized(): void {
// Default implementation is empty.
}

async collect(): Promise<void> {
/**
* Handle a shutdown signal by the SDK.
*
* <p> For push exporters, this should shut down any intervals and close any open connections.
* @protected
*/
protected abstract onShutdown(): Promise<void>;

/**
* Handle a force flush signal by the SDK.
*
* <p> In all scenarios metrics should be collected via {@link collect()}.
* <p> For push exporters, this should collect and report metrics.
* @protected
*/
protected abstract onForceFlush(): Promise<void>;

/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<MetricData[]> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MeterProvider');
throw new Error('MetricReader is not bound to a MetricProducer');
}
const metrics = await this._metricProducer.collect();

// errors thrown to caller
await this._exporter.export(metrics);
// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return [];
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
}

async shutdown(): Promise<void> {
/**
* Shuts down the metric reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
// Do not call shutdown again if it has already been called.
if (this._shutdown) {
api.diag.error('Cannot call shutdown twice.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onShutdown();
} else {
await callWithTimeout(this.onShutdown(), options.timeoutMillis);
}

this._shutdown = true;
// errors thrown to caller
await this._exporter.shutdown();
}

async forceFlush(): Promise<void> {
/**
* Flushes metrics read by this reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onForceFlush();
return;
}

// errors thrown to caller
await this._exporter.forceFlush();
await callWithTimeout(this.onForceFlush(), options.timeoutMillis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
exportIntervalMillis?: number,
exportTimeoutMillis?: number
}

/**
* {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to
* the configured {@link MetricExporter}
*/
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;

private _exporter: MetricExporter;

private readonly _exportInterval: number;

private readonly _exportTimeout: number;

constructor(options: PeriodicExportingMetricReaderOptions) {
super(options.exporter.getPreferredAggregationTemporality());

if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) {
throw Error('exportIntervalMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined && options.exportTimeoutMillis <= 0) {
throw Error('exportTimeoutMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined &&
options.exportIntervalMillis !== undefined &&
options.exportIntervalMillis < options.exportTimeoutMillis) {
throw Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis');
}

this._exportInterval = options.exportIntervalMillis ?? 60000;
this._exportTimeout = options.exportTimeoutMillis ?? 30000;
this._exporter = options.exporter;
}

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
await this._exporter.export(metrics);
}

protected override onInitialized(): void {
// start running the interval as soon as this reader is initialized and keep handle for shutdown.
this._interval = setInterval(async () => {
try {
await callWithTimeout(this._runOnce(), this._exportTimeout);
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout);
return;
}

api.diag.error('Unexpected error during export: %s', err);
}
}, this._exportInterval);
}

protected async onForceFlush(): Promise<void> {
await this._exporter.forceFlush();
}

protected async onShutdown(): Promise<void> {
if (this._interval) {
clearInterval(this._interval);
}

await this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer {
* Delegates for MetricReader.forceFlush.
*/
async forceFlush(): Promise<void> {
return this._metricReader.forceFlush();
await this._metricReader.forceFlush();
}

/**
* Delegates for MetricReader.shutdown.
*/
async shutdown(): Promise<void> {
return this._metricReader.shutdown();
await this._metricReader.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,47 @@ export function hashAttributes(attributes: Attributes): string {
return (result += key + ':' + attributes[key]);
}, '|#');
}

/**
* Error that is thrown on timeouts.
*/
export class TimeoutError extends Error {
constructor(message?: string) {
super(message);

// manually adjust prototype to retain `instanceof` functionality when targeting ES5, see:
// https://github.com/Microsoft/TypeScript-wiki/blob/main/Breaking-Changes.md#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, TimeoutError.prototype);
}
}

/**
* Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise
* rejects, and resolves if the specified promise resolves.
*
* <p> NOTE: this operation will continue even after it throws a {@link TimeoutError}.
*
* @param promise promise to use with timeout.
* @param timeout the timeout in milliseconds until the returned promise is rejected.
*/
export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
let timeoutHandle: ReturnType<typeof setTimeout>;

const timeoutPromise = new Promise<never>(function timeoutFunction(_resolve, reject) {
timeoutHandle = setTimeout(
function timeoutHandler() {
reject(new TimeoutError('Operation timed out.'));
},
timeout
);
});

return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
},
reason => {
clearTimeout(timeoutHandle);
throw reason;
});
}
Loading