Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adding metric observable to be able to support async update #964

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions examples/metrics/metrics/observer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { MeterProvider } = require('@opentelemetry/metrics');
const { MeterProvider, MetricObservable } = require('@opentelemetry/metrics');
const { PrometheusExporter } = require('@opentelemetry/exporter-prometheus');

const exporter = new PrometheusExporter(
Expand All @@ -14,7 +14,7 @@ const exporter = new PrometheusExporter(

const meter = new MeterProvider({
exporter,
interval: 1000,
interval: 2000,
}).getMeter('example-observer');

const otelCpuUsage = meter.createObserver('metric_observer', {
Expand All @@ -27,9 +27,16 @@ function getCpuUsage() {
return Math.random();
}

const observable = new MetricObservable();

setInterval(() => {
observable.next(getCpuUsage());
}, 5000);

otelCpuUsage.setCallback((observerResult) => {
observerResult.observe(getCpuUsage, { pid: process.pid, core: '1' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '2' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '3' });
observerResult.observe(getCpuUsage, { pid: process.pid, core: '4' });
observerResult.observe(observable, { pid: process.pid, core: '5' });
});
Binary file modified examples/metrics/metrics/observer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions packages/opentelemetry-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export * from './metrics/BoundInstrument';
export * from './metrics/Meter';
export * from './metrics/MeterProvider';
export * from './metrics/Metric';
export * from './metrics/MetricObservable';
export * from './metrics/NoopMeter';
export * from './metrics/NoopMeterProvider';
export * from './metrics/ObserverResult';
Expand Down
36 changes: 36 additions & 0 deletions packages/opentelemetry-api/src/metrics/MetricObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*!
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Metric Observable class to handle asynchronous metrics
*/
export interface MetricObservable {
/**
* Sets the next value for observable metric
* @param value
*/
next: (value: number) => void;
/**
* Subscribes for every value change
* @param callback
*/
subscribe: (callback: (value: number) => void) => void;
/**
* Removes the subscriber
* @param [callback]
*/
unsubscribe: (callback?: (value: number) => void) => void;
}
4 changes: 2 additions & 2 deletions packages/opentelemetry-api/src/metrics/ObserverResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/

import { Labels } from './Metric';
import { MetricObservable } from './MetricObservable';

/**
* Interface that is being used in function setCallback for Observer Metric
*/
export interface ObserverResult {
observers: Map<Labels, Function>;
dyladan marked this conversation as resolved.
Show resolved Hide resolved
observe(callback: Function, labels: Labels): void;
observe(callback: Function | MetricObservable, labels: Labels): void;
}
2 changes: 1 addition & 1 deletion packages/opentelemetry-exporter-collector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"ts-node": "^8.6.2",
"tslint-consistent-codestyle": "^1.16.0",
"tslint-microsoft-contrib": "^6.2.0",
"typescript": "3.6.4",
"typescript": "3.7.2",
"webpack": "^4.35.2",
"webpack-cli": "^3.3.9",
"webpack-merge": "^4.2.2"
Expand Down
39 changes: 37 additions & 2 deletions packages/opentelemetry-metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,50 @@ const { MeterProvider } = require('@opentelemetry/metrics');
const meter = new MeterProvider().getMeter('your-meter-name');

const counter = meter.createCounter('metric_name', {
labelKeys: ["pid"],
description: "Example of a counter"
labelKeys: ['pid'],
description: 'Example of a counter'
});

const labels = { pid: process.pid };

// Create a BoundInstrument associated with specified label values.
const boundCounter = counter.bind(labels);
boundCounter.add(10);

```

### Observable
Choose this kind of metric when only last value is important without worry about aggregation

```js
const { MeterProvider, MetricObservable } = require('@opentelemetry/metrics');

// Initialize the Meter to capture measurements in various ways.
const meter = new MeterProvider().getMeter('your-meter-name');

const observer = meter.createObserver('metric_name', {
labelKeys: ['pid', 'core'],
description: 'Example of a observer'
});

function getCpuUsage() {
return Math.random();
}

const metricObservable = new MetricObservable();

observer.setCallback((observerResult) => {
// synchronous callback
observerResult.observe(getCpuUsage, { pid: process.pid, core: '1' });
// asynchronous callback
observerResult.observe(metricObservable, { pid: process.pid, core: '2' });
});

// simulate asynchronous operation
setInterval(()=> {
metricObservable.next(getCpuUsage());
}, 2000)

```

See [examples/prometheus](https://github.com/open-telemetry/opentelemetry-js/tree/master/examples/prometheus) for a short example.
Expand Down
42 changes: 24 additions & 18 deletions packages/opentelemetry-metrics/src/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import * as types from '@opentelemetry/api';
import * as api from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';
import {
BoundCounter,
Expand All @@ -30,11 +30,11 @@ import { hashLabels } from './Utils';

/** This is a SDK implementation of {@link Metric} interface. */
export abstract class Metric<T extends BaseBoundInstrument>
implements types.Metric<T> {
implements api.Metric<T> {
protected readonly _monotonic: boolean;
protected readonly _disabled: boolean;
protected readonly _valueType: types.ValueType;
protected readonly _logger: types.Logger;
protected readonly _valueType: api.ValueType;
protected readonly _logger: api.Logger;
private readonly _descriptor: MetricDescriptor;
private readonly _instruments: Map<string, T> = new Map();

Expand All @@ -58,7 +58,7 @@ export abstract class Metric<T extends BaseBoundInstrument>
* @param labels key-values pairs that are associated with a specific metric
* that you want to record.
*/
bind(labels: types.Labels): T {
bind(labels: api.Labels): T {
const hash = hashLabels(labels);
if (this._instruments.has(hash)) return this._instruments.get(hash)!;

Expand All @@ -71,7 +71,7 @@ export abstract class Metric<T extends BaseBoundInstrument>
* Removes the Instrument from the metric, if it is present.
* @param labels key-values pairs that are associated with a specific metric.
*/
unbind(labels: types.Labels): void {
unbind(labels: api.Labels): void {
this._instruments.delete(hashLabels(labels));
}

Expand Down Expand Up @@ -102,12 +102,12 @@ export abstract class Metric<T extends BaseBoundInstrument>
};
}

protected abstract _makeInstrument(labels: types.Labels): T;
protected abstract _makeInstrument(labels: api.Labels): T;
}

/** This is a SDK implementation of Counter Metric. */
export class CounterMetric extends Metric<BoundCounter>
implements Pick<types.MetricUtils, 'add'> {
implements Pick<api.MetricUtils, 'add'> {
constructor(
name: string,
options: MetricOptions,
Expand All @@ -116,7 +116,7 @@ export class CounterMetric extends Metric<BoundCounter>
) {
super(name, options, MetricKind.COUNTER, resource);
}
protected _makeInstrument(labels: types.Labels): BoundCounter {
protected _makeInstrument(labels: api.Labels): BoundCounter {
return new BoundCounter(
labels,
this._disabled,
Expand All @@ -134,13 +134,13 @@ export class CounterMetric extends Metric<BoundCounter>
* @param labels key-values pairs that are associated with a specific metric
* that you want to record.
*/
add(value: number, labels: types.Labels) {
add(value: number, labels: api.Labels) {
this.bind(labels).add(value);
}
}

export class MeasureMetric extends Metric<BoundMeasure>
implements Pick<types.MetricUtils, 'record'> {
implements Pick<api.MetricUtils, 'record'> {
protected readonly _absolute: boolean;

constructor(
Expand All @@ -153,7 +153,7 @@ export class MeasureMetric extends Metric<BoundMeasure>

this._absolute = options.absolute !== undefined ? options.absolute : true; // Absolute default is true
}
protected _makeInstrument(labels: types.Labels): BoundMeasure {
protected _makeInstrument(labels: api.Labels): BoundMeasure {
return new BoundMeasure(
labels,
this._disabled,
Expand All @@ -165,15 +165,15 @@ export class MeasureMetric extends Metric<BoundMeasure>
);
}

record(value: number, labels: types.Labels) {
record(value: number, labels: api.Labels) {
this.bind(labels).record(value);
}
}

/** This is a SDK implementation of Observer Metric. */
export class ObserverMetric extends Metric<BoundObserver>
implements Pick<types.MetricUtils, 'setCallback'> {
private _observerResult: types.ObserverResult = new ObserverResult();
implements Pick<api.MetricUtils, 'setCallback'> {
private _observerResult = new ObserverResult();

constructor(
name: string,
Expand All @@ -184,7 +184,7 @@ export class ObserverMetric extends Metric<BoundObserver>
super(name, options, MetricKind.OBSERVER, resource);
}

protected _makeInstrument(labels: types.Labels): BoundObserver {
protected _makeInstrument(labels: api.Labels): BoundObserver {
return new BoundObserver(
labels,
this._disabled,
Expand All @@ -196,7 +196,7 @@ export class ObserverMetric extends Metric<BoundObserver>
}

getMetricRecord(): MetricRecord[] {
this._observerResult.observers.forEach((callback, labels) => {
this._observerResult.callbackObservers.forEach((callback, labels) => {
const instrument = this.bind(labels);
instrument.update(callback());
});
Expand All @@ -207,7 +207,13 @@ export class ObserverMetric extends Metric<BoundObserver>
* Sets a callback where user can observe value for certain labels
* @param callback
*/
setCallback(callback: (observerResult: types.ObserverResult) => void): void {
setCallback(callback: (observerResult: api.ObserverResult) => void): void {
callback(this._observerResult);
this._observerResult.observers.forEach((observer, labels) => {
observer.subscribe(value => {
const instrument = this.bind(labels);
instrument.update(value);
});
});
}
}
51 changes: 51 additions & 0 deletions packages/opentelemetry-metrics/src/MetricObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*!
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as api from '@opentelemetry/api';

type Subscriber = (value?: number) => void;

/**
* Implements the Metric Observable pattern
*/
export class MetricObservable implements api.MetricObservable {
private _subscribers: Subscriber[] = [];

next(value: number) {
for (const subscriber of this._subscribers) {
subscriber(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use setImmediate(subscriber, value) to avoid calling subscribers synchronously

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do think it would be a problem if we call them synchronously. The common pattern for such things is to make it synchronous. Also if user is calling "next" I would assume that this is the moment it should be updated, including the time as well, using setImmediate the time won't be accurate. I also think that this might cause some unexpected behaviour, if for any reason you would like to also call "collect" after you update values. I would be rather against that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid blocking the event loop for too long if there are many subscribers to update, or if the subscribers take too long to process a value. A user may also subscribe to updates, not just us, and the user is unlikely to know their callback is in a performance sensitive context

Copy link
Member Author

@obecny obecny Apr 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "hidden" async operation will prevent user from calling collect immediately after updating the values, and time for updating the value will also be different then moment of calling next. Besides that "blocking loop" would last as long as callback in "standard way" anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mayurkale22 @vmarchaud whats your thoughts on that ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmarchaud setImmediate will delay update - it won't be updated asap as spec says

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find any requirement on specific time requirement, could you give a link please ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmarchaud you wrote the spec state that the update path should be as fast as possible so setImmediate won't update asap but it will update in next cycle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the whole spec state:

Metric updates made via a bound instrument, when used with an Aggregator defined by simple atomic operations, should follow a very short code path

I interpret short code path from a performance requirement and not as a time requirement. So using setImmediate would be a better solution for me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm strongly against it but if majority want's setImmediate with it's all consequences fine for me, @mayurkale22 you also vote for setImmediate ?

}
}

subscribe(subscriber: Function) {
if (typeof subscriber === 'function') {
this._subscribers.push(subscriber as Subscriber);
}
}

unsubscribe(subscriber?: Function) {
if (typeof subscriber === 'function') {
for (let i = 0, j = this._subscribers.length; i < j; i++) {
if (this._subscribers[i] === subscriber) {
this._subscribers.splice(i, 1);
break;
}
}
} else {
this._subscribers = [];
}
}
}
16 changes: 13 additions & 3 deletions packages/opentelemetry-metrics/src/ObserverResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {
MetricObservable,
ObserverResult as TypeObserverResult,
Labels,
} from '@opentelemetry/api';
Expand All @@ -23,8 +24,17 @@ import {
* Implementation of {@link TypeObserverResult}
*/
export class ObserverResult implements TypeObserverResult {
observers = new Map<Labels, Function>();
observe(callback: any, labels: Labels): void {
this.observers.set(labels, callback);
callbackObservers: Map<Labels, Function> = new Map<Labels, Function>();
observers: Map<Labels, MetricObservable> = new Map<
Labels,
MetricObservable
>();

observe(callback: Function | MetricObservable, labels: Labels): void {
if (typeof callback === 'function') {
this.callbackObservers.set(labels, callback);
} else {
this.observers.set(labels, callback);
}
}
}
4 changes: 2 additions & 2 deletions packages/opentelemetry-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

export * from './BoundInstrument';
export * from './Meter';
export * from './Metric';
export * from './MeterProvider';
export * from './Metric';
export * from './MetricObservable';
export * from './export/Aggregator';
export * from './export/ConsoleMetricExporter';
export * from './export/types';
export * from './export/Aggregator';
Loading