diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 3d812038623..afa38e8cbd7 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -53,6 +53,7 @@ All notable changes to experimental packages in this project will be documented ### :house: (Internal) * chore: move trace exporters back to experimental #2835 @dyladan +* refactor(sdk-metrics-base): meter shared states #2821 @legendecas ## v0.27.0 diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts index bd298c73350..96ca69b0ec1 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts @@ -16,27 +16,19 @@ import * as metrics from '@opentelemetry/api-metrics'; import { InstrumentationLibrary } from '@opentelemetry/core'; -import { createInstrumentDescriptor, InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor'; +import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor'; import { CounterInstrument, HistogramInstrument, UpDownCounterInstrument } from './Instruments'; import { MeterProviderSharedState } from './state/MeterProviderSharedState'; -import { MultiMetricStorage } from './state/MultiWritableMetricStorage'; -import { SyncMetricStorage } from './state/SyncMetricStorage'; -import { InstrumentationLibraryMetrics } from './export/MetricData'; -import { isNotNullish } from './utils'; -import { MetricCollectorHandle } from './state/MetricCollector'; -import { HrTime } from '@opentelemetry/api'; -import { AsyncMetricStorage } from './state/AsyncMetricStorage'; -import { WritableMetricStorage } from './state/WritableMetricStorage'; -import { MetricStorageRegistry } from './state/MetricStorageRegistry'; +import { MeterSharedState } from './state/MeterSharedState'; /** * This class implements the {@link metrics.Meter} interface. */ export class Meter implements metrics.Meter { - private _metricStorageRegistry = new MetricStorageRegistry(); + private _meterSharedState: MeterSharedState; - constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) { - this._meterProviderSharedState.meters.push(this); + constructor(meterProviderSharedState: MeterProviderSharedState, instrumentationLibrary: InstrumentationLibrary) { + this._meterSharedState = meterProviderSharedState.getMeterSharedState(instrumentationLibrary); } /** @@ -44,7 +36,7 @@ export class Meter implements metrics.Meter { */ createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram { const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options); - const storage = this._registerMetricStorage(descriptor); + const storage = this._meterSharedState.registerMetricStorage(descriptor); return new HistogramInstrument(storage, descriptor); } @@ -53,7 +45,7 @@ export class Meter implements metrics.Meter { */ createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter { const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options); - const storage = this._registerMetricStorage(descriptor); + const storage = this._meterSharedState.registerMetricStorage(descriptor); return new CounterInstrument(storage, descriptor); } @@ -62,7 +54,7 @@ export class Meter implements metrics.Meter { */ createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter { const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options); - const storage = this._registerMetricStorage(descriptor); + const storage = this._meterSharedState.registerMetricStorage(descriptor); return new UpDownCounterInstrument(storage, descriptor); } @@ -75,7 +67,7 @@ export class Meter implements metrics.Meter { options?: metrics.ObservableGaugeOptions, ): void { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options); - this._registerAsyncMetricStorage(descriptor, callback); + this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); } /** @@ -87,7 +79,7 @@ export class Meter implements metrics.Meter { options?: metrics.ObservableCounterOptions, ): void { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options); - this._registerAsyncMetricStorage(descriptor, callback); + this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); } /** @@ -99,47 +91,6 @@ export class Meter implements metrics.Meter { options?: metrics.ObservableUpDownCounterOptions, ): void { const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options); - this._registerAsyncMetricStorage(descriptor, callback); - } - - private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage { - const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); - const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor))) - .filter(isNotNullish); - - if (storages.length === 1) { - return storages[0]; - } - - // This will be a no-op WritableMetricStorage when length is null. - return new MultiMetricStorage(storages); - } - - private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) { - const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); - views.forEach(view => { - this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback)); - }); - } - - /** - * @internal - * @param collector opaque handle of {@link MetricCollector} which initiated the collection. - * @param collectionTime the HrTime at which the collection was initiated. - * @returns the list of {@link MetricData} collected. - */ - async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise { - const metricData = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => { - return metricStorage.collect( - collector, - this._meterProviderSharedState.metricCollectors, - this._meterProviderSharedState.sdkStartTime, - collectionTime); - })); - - return { - instrumentationLibrary: this._instrumentationLibrary, - metrics: metricData.filter(isNotNullish), - }; + this._meterSharedState.registerAsyncMetricStorage(descriptor, callback); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts index f6d81a82f94..6c35e650bf8 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts @@ -15,10 +15,10 @@ */ import { HrTime } from '@opentelemetry/api'; -import { hrTime } from '@opentelemetry/core'; +import { hrTime, InstrumentationLibrary } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; -import { Meter } from '../Meter'; import { ViewRegistry } from '../view/ViewRegistry'; +import { MeterSharedState } from './MeterSharedState'; import { MetricCollector } from './MetricCollector'; /** @@ -30,7 +30,15 @@ export class MeterProviderSharedState { metricCollectors: MetricCollector[] = []; - meters: Meter[] = []; + meterSharedStates: MeterSharedState[] = []; constructor(public resource: Resource) {} + + getMeterSharedState(instrumentationLibrary: InstrumentationLibrary) { + // TODO: meter identity + // https://github.com/open-telemetry/opentelemetry-js/issues/2593 + const meterSharedState = new MeterSharedState(this, instrumentationLibrary); + this.meterSharedStates.push(meterSharedState); + return meterSharedState; + } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts new file mode 100644 index 00000000000..912cf54557c --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts @@ -0,0 +1,89 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HrTime } from '@opentelemetry/api'; +import * as metrics from '@opentelemetry/api-metrics'; +import { InstrumentationLibrary } from '@opentelemetry/core'; +import { InstrumentationLibraryMetrics } from '../export/MetricData'; +import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor'; +import { isNotNullish } from '../utils'; +import { AsyncMetricStorage } from './AsyncMetricStorage'; +import { MeterProviderSharedState } from './MeterProviderSharedState'; +import { MetricCollectorHandle } from './MetricCollector'; +import { MetricStorageRegistry } from './MetricStorageRegistry'; +import { MultiMetricStorage } from './MultiWritableMetricStorage'; +import { SyncMetricStorage } from './SyncMetricStorage'; + +/** + * An internal record for shared meter provider states. + */ +export class MeterSharedState { + private _metricStorageRegistry = new MetricStorageRegistry(); + + constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {} + + registerMetricStorage(descriptor: InstrumentDescriptor) { + const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); + const storages = views + .map(view => { + const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor); + const aggregator = view.aggregation.createAggregator(viewDescriptor); + const storage = new SyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor); + return this._metricStorageRegistry.register(storage); + }) + .filter(isNotNullish); + if (storages.length === 1) { + return storages[0]; + } + return new MultiMetricStorage(storages); + } + + registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) { + const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary); + views.forEach(view => { + const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor); + const aggregator = view.aggregation.createAggregator(viewDescriptor); + const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback); + this._metricStorageRegistry.register(viewStorage); + }); + } + + /** + * @param collector opaque handle of {@link MetricCollector} which initiated the collection. + * @param collectionTime the HrTime at which the collection was initiated. + * @returns the list of {@link MetricData} collected. + */ + async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise { + /** + * 1. Call all observable callbacks first. + * 2. Collect metric result for the collector. + */ + const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages()) + .map(metricStorage => { + return metricStorage.collect( + collector, + this._meterProviderSharedState.metricCollectors, + this._meterProviderSharedState.sdkStartTime, + collectionTime); + }) + .filter(isNotNullish)); + + return { + instrumentationLibrary: this._instrumentationLibrary, + metrics: metricDataList.filter(isNotNullish), + }; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts index 6be9022b990..0cff46838a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts @@ -34,8 +34,8 @@ export class MetricCollector implements MetricProducer { async collect(): Promise { const collectionTime = hrTime(); - const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters - .map(meter => meter.collect(this, collectionTime)))); + const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meterSharedStates + .map(meterSharedState => meterSharedState.collect(this, collectionTime)))); return { resource: this._sharedState.resource, diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricExporter.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricExporter.ts new file mode 100644 index 00000000000..43df025473d --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/export/TestMetricExporter.ts @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import { AggregationTemporality, PushMetricExporter, ResourceMetrics } from '../../src'; + +export class TestMetricExporter implements PushMetricExporter { + resourceMetricsList: ResourceMetrics[] = []; + export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + this.resourceMetricsList.push(resourceMetrics); + process.nextTick(() => resultCallback({ code: ExportResultCode.SUCCESS })); + } + + async forceFlush(): Promise {} + async shutdown(): Promise {} + + getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.CUMULATIVE; + } +} + +export class TestDeltaMetricExporter extends TestMetricExporter { + override getPreferredAggregationTemporality(): AggregationTemporality { + return AggregationTemporality.DELTA; + } +} diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts new file mode 100644 index 00000000000..1e0cd68f99c --- /dev/null +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MeterSharedState.test.ts @@ -0,0 +1,97 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { Meter, MeterProvider, DataPointType } from '../../src'; +import { assertMetricData, defaultInstrumentationLibrary, defaultResource } from '../util'; +import { TestMetricReader } from '../export/TestMetricReader'; +import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter'; +import { MeterSharedState } from '../../src/state/MeterSharedState'; + +describe('MeterSharedState', () => { + afterEach(() => { + sinon.restore(); + }); + + describe('collect', () => { + function setupInstruments() { + const meterProvider = new MeterProvider({ resource: defaultResource }); + + const cumulativeReader = new TestMetricReader(new TestMetricExporter().getPreferredAggregationTemporality()); + meterProvider.addMetricReader(cumulativeReader); + const cumulativeCollector = cumulativeReader.getMetricCollector(); + + const deltaReader = new TestMetricReader(new TestDeltaMetricExporter().getPreferredAggregationTemporality()); + meterProvider.addMetricReader(deltaReader); + const deltaCollector = deltaReader.getMetricCollector(); + + const metricCollectors = [cumulativeCollector, deltaCollector]; + + const meter = meterProvider.getMeter(defaultInstrumentationLibrary.name, defaultInstrumentationLibrary.version, { + schemaUrl: defaultInstrumentationLibrary.schemaUrl, + }) as Meter; + const meterSharedState = meter['_meterSharedState'] as MeterSharedState; + + return { metricCollectors, cumulativeCollector, deltaCollector, meter, meterSharedState, meterProvider }; + } + + it('should collect sync metrics', async () => { + /** preparing test instrumentations */ + const { metricCollectors, meter } = setupInstruments(); + + /** creating metric events */ + const counter = meter.createCounter('test'); + + /** collect metrics */ + counter.add(1); + await Promise.all(metricCollectors.map(async collector => { + const result = await collector.collect(); + assert.strictEqual(result.instrumentationLibraryMetrics.length, 1); + assert.strictEqual(result.instrumentationLibraryMetrics[0].metrics.length, 1); + assertMetricData(result.instrumentationLibraryMetrics[0].metrics[0], DataPointType.SINGULAR, { + name: 'test', + }); + })); + }); + + it('should collect sync metrics with views', async () => { + /** preparing test instrumentations */ + const { metricCollectors, meter, meterProvider } = setupInstruments(); + + /** creating metric events */ + meterProvider.addView({ name: 'foo' }, { instrument: { name: 'test' } }); + meterProvider.addView({ name: 'bar' }, { instrument: { name: 'test' } }); + + const counter = meter.createCounter('test'); + + /** collect metrics */ + counter.add(1); + await Promise.all(metricCollectors.map(async collector => { + const result = await collector.collect(); + assert.strictEqual(result.instrumentationLibraryMetrics.length, 1); + assert.strictEqual(result.instrumentationLibraryMetrics[0].metrics.length, 2); + assertMetricData(result.instrumentationLibraryMetrics[0].metrics[0], DataPointType.SINGULAR, { + name: 'foo', + }); + assertMetricData(result.instrumentationLibraryMetrics[0].metrics[1], DataPointType.SINGULAR, { + name: 'bar', + }); + })); + }); + + }); +}); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts index 5c8ba97c329..79d29f85603 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricCollector.test.ts @@ -17,34 +17,13 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; import { MeterProvider } from '../../src'; -import { AggregationTemporality } from '../../src/export/AggregationTemporality'; -import { DataPointType, ResourceMetrics } from '../../src/export/MetricData'; +import { DataPointType } from '../../src/export/MetricData'; import { PushMetricExporter } from '../../src/export/MetricExporter'; import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState'; import { MetricCollector } from '../../src/state/MetricCollector'; import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint } from '../util'; import { TestMetricReader } from '../export/TestMetricReader'; -import { ExportResult, ExportResultCode } from '@opentelemetry/core'; - -class TestMetricExporter implements PushMetricExporter { - async export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): Promise { - resultCallback({code: ExportResultCode.SUCCESS}); - } - - async shutdown(): Promise {} - - async forceFlush(): Promise {} - - getPreferredAggregationTemporality(): AggregationTemporality { - return AggregationTemporality.CUMULATIVE; - } -} - -class TestDeltaMetricExporter extends TestMetricExporter { - override getPreferredAggregationTemporality(): AggregationTemporality { - return AggregationTemporality.DELTA; - } -} +import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter'; describe('MetricCollector', () => { afterEach(() => { diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index 8306dc12a26..e01c72b018b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -54,7 +54,7 @@ export const commonAttributes: Attributes[] = [{}, { 1: '1' }, { a: '2' }, new ( })]; export const sleep = (time: number) => - new Promise(resolve => { + new Promise(resolve => { return setTimeout(resolve, time); });