Skip to content

Commit

Permalink
refactor(sdk-metrics-base): add ObservableRegistry and hoist invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Mar 8, 2022
1 parent e1fd1b8 commit 89c0ecd
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export interface Meter {

/**
* Creates a new `ObservableGauge` metric.
*
* The callback SHOULD be reentrant safe.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -92,6 +95,9 @@ export interface Meter {

/**
* Creates a new `ObservableCounter` metric.
*
* The callback SHOULD be reentrant safe.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -104,6 +110,9 @@ export interface Meter {

/**
* Creates a new `ObservableUpDownCounter` metric.
*
* The callback SHOULD be reentrant safe.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,38 @@
*/

import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { ObservableResult } from '../ObservableResult';
import { AttributeHashMap } from './HashMap';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* Internal interface.
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements AsyncWritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

private _record(measurements: AttributeHashMap<number>) {
record(measurements: AttributeHashMap<number>) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
processed.set(this._attributesProcessor.process(attributes), value);
Expand All @@ -67,17 +61,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
const observableResult = new ObservableResult();
// TODO: timeout with callback
await this._callback(observableResult);
this._record(observableResult.buffer);

): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -89,10 +78,4 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor, callback: ObservableCallback): AsyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new AsyncMetricStorage(instrument, aggregator, view.attributesProcessor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorageRegistry } from './MetricStorageRegistry';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { ObservableRegistry } from './ObservableRegistry';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new MetricStorageRegistry();
private _observableRegistry = new ObservableRegistry();

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {}

Expand All @@ -56,8 +58,12 @@ export class MeterSharedState {
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
this._metricStorageRegistry.register(viewStorage);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
const storage = this._metricStorageRegistry.register(viewStorage);
if (storage == null) {
return;
}
this._observableRegistry.addCallback(callback, storage);
});
}

Expand All @@ -71,15 +77,16 @@ export class MeterSharedState {
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
await this._observableRegistry.observe();
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish));
.filter(isNotNullish);

return {
instrumentationLibrary: this._instrumentationLibrary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export abstract class MetricStorage {
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
): Maybe<MetricData>;

getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ObservableCallback } from '@opentelemetry/api-metrics-wip';
import { ObservableResult } from '../ObservableResult';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* An internal state interface for ObservableCallbacks.
*
* An ObservableCallback can be bound to multiple AsyncMetricStorage at once
* for batch observations. And an AsyncMetricStorage may be bound to multiple
* callbacks too.
*
* However an ObservableCallback must not be called multiple times during a
* single collection operation.
*/
export class ObservableRegistry {
private _callbacks: [ObservableCallback, AsyncWritableMetricStorage][] = [];

addCallback(callback: ObservableCallback, metricStorage: AsyncWritableMetricStorage) {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
metricStorage.record(observableResult.buffer);
})
);

await promise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ import { Context, HrTime } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
Expand Down Expand Up @@ -61,12 +57,12 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -78,10 +74,4 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new SyncMetricStorage(instrument, aggregator, view.attributesProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@

import { Context } from '@opentelemetry/api';
import { Attributes } from '@opentelemetry/api-metrics-wip';
import { AttributeHashMap } from './HashMap';

/**
* Internal interface.
* Internal interface. Stores measurements and allows synchronous writes of
* measurements.
*
* Stores {@link MetricData} and allows synchronous writes of measurements.
* An interface representing SyncMetricStorage with type parameters removed.
*/
export interface WritableMetricStorage {
/** Records a measurement. */
record(value: number, attributes: Attributes, context: Context): void;
}

export class NoopWritableMetricStorage implements WritableMetricStorage {
record(_value: number, _attributes: Attributes, _context: Context): void {}
/**
* Internal interface. Stores measurements and allows asynchronous writes of
* measurements.
*
* An interface representing AsyncMetricStorage with type parameters removed.
*/
export interface AsyncWritableMetricStorage {
/** Records a batch of measurements. */
record(measurements: AttributeHashMap<number>): void;
}
Loading

0 comments on commit 89c0ecd

Please sign in to comment.