Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics-base): hoist async instrument callback invocations #2822

Merged
merged 8 commits into from
May 6, 2022
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ All notable changes to experimental packages in this project will be documented
* feat(proto): add @opentelemetry/otlp-transformer package with hand-rolled transformation #2746 @dyladan
* feat(sdk-metrics-base): shutdown and forceflush on MeterProvider #2890 @legendecas
* feat(sdk-metrics-base): return the same meter for identical input to getMeter #2901 @legendecas
* feat(sdk-metrics-base): hoist async instrument callback invocations #2822 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export interface Meter {

/**
* Creates a new `ObservableGauge` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -92,6 +95,9 @@ export interface Meter {

/**
* Creates a new `ObservableCounter` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand All @@ -104,6 +110,9 @@ export interface Meter {

/**
* Creates a new `ObservableUpDownCounter` metric.
*
* The callback SHOULD be safe to be invoked concurrently.
*
* @param name the name of the metric.
* @param callback the observable callback
* @param [options] the metric options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,38 @@
*/

import { HrTime } from '@opentelemetry/api';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
import { DeltaMetricProcessor } from './DeltaMetricProcessor';
import { TemporalMetricProcessor } from './TemporalMetricProcessor';
import { Maybe } from '../utils';
import { MetricCollectorHandle } from './MetricCollector';
import { ObservableResult } from '../ObservableResult';
import { AttributeHashMap } from './HashMap';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* Internal interface.
*
* Stores and aggregates {@link MetricData} for asynchronous instruments.
*/
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage {
export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStorage implements AsyncWritableMetricStorage {
private _deltaMetricStorage: DeltaMetricProcessor<T>;
private _temporalMetricStorage: TemporalMetricProcessor<T>;

constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
private _callback: ObservableCallback
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

private _record(measurements: AttributeHashMap<number>) {
record(measurements: AttributeHashMap<number>) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
processed.set(this._attributesProcessor.process(attributes), value);
Expand All @@ -67,17 +61,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
const observableResult = new ObservableResult();
// TODO: timeout with callback
await this._callback(observableResult);
this._record(observableResult.buffer);

): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -89,10 +78,4 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor, callback: ObservableCallback): AsyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new AsyncMetricStorage(instrument, aggregator, view.attributesProcessor, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import { MeterProviderSharedState } from './MeterProviderSharedState';
import { MetricCollectorHandle } from './MetricCollector';
import { MetricStorageRegistry } from './MetricStorageRegistry';
import { MultiMetricStorage } from './MultiWritableMetricStorage';
import { ObservableRegistry } from './ObservableRegistry';
import { SyncMetricStorage } from './SyncMetricStorage';

/**
* An internal record for shared meter provider states.
*/
export class MeterSharedState {
private _metricStorageRegistry = new MetricStorageRegistry();
private _observableRegistry = new ObservableRegistry();
meter: Meter;

constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
Expand Down Expand Up @@ -60,8 +62,12 @@ export class MeterSharedState {
views.forEach(view => {
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
const aggregator = view.aggregation.createAggregator(viewDescriptor);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
this._metricStorageRegistry.register(viewStorage);
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
const storage = this._metricStorageRegistry.register(viewStorage);
if (storage == null) {
return;
}
this._observableRegistry.addCallback(callback, storage);
});
}

Expand All @@ -75,15 +81,16 @@ export class MeterSharedState {
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
await this._observableRegistry.observe();
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
this._meterProviderSharedState.sdkStartTime,
collectionTime);
})
.filter(isNotNullish));
.filter(isNotNullish);

return {
instrumentationLibrary: this._instrumentationLibrary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export abstract class MetricStorage {
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>>;
): Maybe<MetricData>;

getInstrumentDescriptor(): InstrumentDescriptor{
return this._instrumentDescriptor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
* An internal state interface for ObservableCallbacks.
*
* An ObservableCallback can be bound to multiple AsyncMetricStorage at once
* for batch observations. And an AsyncMetricStorage may be bound to multiple
* callbacks too.
*
* However an ObservableCallback must not be called multiple times during a
* single collection operation.
*/
export class ObservableRegistry {
private _callbacks: [ObservableCallback, AsyncWritableMetricStorage][] = [];

addCallback(callback: ObservableCallback, metricStorage: AsyncWritableMetricStorage) {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
metricStorage.record(observableResult.buffer);
})
);

await promise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ import { Context, HrTime } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { WritableMetricStorage } from './WritableMetricStorage';
import { Accumulation, Aggregator } from '../aggregator/types';
import { View } from '../view/View';
import {
createInstrumentDescriptorWithView,
InstrumentDescriptor
} from '../InstrumentDescriptor';
import { InstrumentDescriptor } from '../InstrumentDescriptor';
import { AttributesProcessor } from '../view/AttributesProcessor';
import { MetricStorage } from './MetricStorage';
import { MetricData } from '../export/MetricData';
Expand Down Expand Up @@ -61,12 +57,12 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
* Note: This is a stateful operation and may reset any interval-related
* state for the MetricCollector.
*/
async collect(
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Promise<Maybe<MetricData>> {
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
Expand All @@ -78,10 +74,4 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>> extends MetricStor
collectionTime
);
}

static create(view: View, instrument: InstrumentDescriptor): SyncMetricStorage<Maybe<Accumulation>> {
instrument = createInstrumentDescriptorWithView(view, instrument);
const aggregator = view.aggregation.createAggregator(instrument);
return new SyncMetricStorage(instrument, aggregator, view.attributesProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@

import { Context } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { AttributeHashMap } from './HashMap';

/**
* Internal interface.
* Internal interface. Stores measurements and allows synchronous writes of
* measurements.
*
* Stores {@link MetricData} and allows synchronous writes of measurements.
* An interface representing SyncMetricStorage with type parameters removed.
*/
export interface WritableMetricStorage {
/** Records a measurement. */
record(value: number, attributes: MetricAttributes, context: Context): void;
}

export class NoopWritableMetricStorage implements WritableMetricStorage {
record(_value: number, _attributes: MetricAttributes, _context: Context): void {}
/**
* Internal interface. Stores measurements and allows asynchronous writes of
* measurements.
*
* An interface representing AsyncMetricStorage with type parameters removed.
*/
export interface AsyncWritableMetricStorage {
/** Records a batch of measurements. */
record(measurements: AttributeHashMap<number>): void;
}
Loading