From b7360125e3d5fc6b82115a2bf3eef99f606dc101 Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Mon, 11 Nov 2024 11:07:27 +0200 Subject: [PATCH] feat(sdk-metrics): add CardinalitySelector to MetricReader --- .../src/export/CardinalitySelector.ts | 21 ++++ .../sdk-metrics/src/export/MetricReader.ts | 18 ++++ .../src/state/AsyncMetricStorage.ts | 8 +- .../src/state/DeltaMetricProcessor.ts | 43 +++++--- .../sdk-metrics/src/state/MeterSharedState.ts | 7 +- .../sdk-metrics/src/state/MetricCollector.ts | 9 ++ packages/sdk-metrics/src/view/View.ts | 8 +- .../sdk-metrics/test/MeterProvider.test.ts | 99 ++++++++++++++++++- .../test/state/AsyncMetricStorage.test.ts | 2 + .../test/state/DeltaMetricProcessor.test.ts | 29 ++++++ .../test/state/MetricStorageRegistry.test.ts | 6 ++ .../test/state/SyncMetricStorage.test.ts | 2 + .../state/TemporalMetricProcessor.test.ts | 3 + 13 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 packages/sdk-metrics/src/export/CardinalitySelector.ts diff --git a/packages/sdk-metrics/src/export/CardinalitySelector.ts b/packages/sdk-metrics/src/export/CardinalitySelector.ts new file mode 100644 index 00000000000..a274898b9a2 --- /dev/null +++ b/packages/sdk-metrics/src/export/CardinalitySelector.ts @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { InstrumentType } from '../InstrumentDescriptor'; +/** + * Cardinality Limit selector based on metric instrument types. + */ +export type CardinalitySelector = (instrumentType: InstrumentType) => number; diff --git a/packages/sdk-metrics/src/export/MetricReader.ts b/packages/sdk-metrics/src/export/MetricReader.ts index 8aad601d70f..e87d55884db 100644 --- a/packages/sdk-metrics/src/export/MetricReader.ts +++ b/packages/sdk-metrics/src/export/MetricReader.ts @@ -32,6 +32,7 @@ import { DEFAULT_AGGREGATION_SELECTOR, DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR, } from './AggregationSelector'; +import { CardinalitySelector } from './CardinalitySelector'; export interface MetricReaderOptions { /** @@ -45,6 +46,11 @@ export interface MetricReaderOptions { * not configured, cumulative is used for all instruments. */ aggregationTemporalitySelector?: AggregationTemporalitySelector; + /** + * Cardinality selector based on metric instrument types. If not configured, + * a default value is used. + */ + cardinalitySelector?: CardinalitySelector; /** * **Note, this option is experimental**. Additional MetricProducers to use as a source of * aggregated metric data in addition to the SDK's metric data. The resource returned by @@ -68,6 +74,7 @@ export abstract class MetricReader { private _sdkMetricProducer?: MetricProducer; private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector; private readonly _aggregationSelector: AggregationSelector; + private readonly _cardinalitySelector?: CardinalitySelector; constructor(options?: MetricReaderOptions) { this._aggregationSelector = @@ -76,6 +83,7 @@ export abstract class MetricReader { options?.aggregationTemporalitySelector ?? DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR; this._metricProducers = options?.metricProducers ?? []; + this._cardinalitySelector = options?.cardinalitySelector; } /** @@ -116,6 +124,16 @@ export abstract class MetricReader { return this._aggregationTemporalitySelector(instrumentType); } + /** + * Select the cardinality limit for the given {@link InstrumentType} for this + * reader. + */ + selectCardinalityLimit(instrumentType: InstrumentType): number { + return this._cardinalitySelector + ? this._cardinalitySelector(instrumentType) + : 2000; // default value if no selector is provided + } + /** * Handle once the SDK has initialized this {@link MetricReader} * Overriding this method is optional. diff --git a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts index 6bebafdc1f1..be2cf7f25ac 100644 --- a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts @@ -43,10 +43,14 @@ export class AsyncMetricStorage> _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, private _attributesProcessor: AttributesProcessor, - collectorHandles: MetricCollectorHandle[] + collectorHandles: MetricCollectorHandle[], + private _aggregationCardinalityLimit?: number ) { super(_instrumentDescriptor); - this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); + this._deltaMetricStorage = new DeltaMetricProcessor( + aggregator, + this._aggregationCardinalityLimit + ); this._temporalMetricStorage = new TemporalMetricProcessor( aggregator, collectorHandles diff --git a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts index 679375f65ef..4456792ebc3 100644 --- a/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/DeltaMetricProcessor.ts @@ -15,7 +15,7 @@ */ import { Context, HrTime, Attributes } from '@opentelemetry/api'; -import { Maybe } from '../utils'; +import { Maybe, hashAttributes } from '../utils'; import { Accumulation, Aggregator } from '../aggregator/types'; import { AttributeHashMap } from './HashMap'; @@ -45,20 +45,23 @@ export class DeltaMetricProcessor> { _context: Context, collectionTime: HrTime ) { - if (this._activeCollectionStorage.size >= this._cardinalityLimit) { - const overflowAttributes = { 'otel.metric.overflow': true }; - const overflowAccumulation = this._activeCollectionStorage.getOrDefault( - overflowAttributes, - () => this._aggregator.createAccumulation(collectionTime) - ); - overflowAccumulation?.record(value); - return; + let accumulation = this._activeCollectionStorage.get(attributes); + + if (!accumulation) { + if (this._activeCollectionStorage.size >= this._cardinalityLimit) { + const overflowAttributes = { 'otel.metric.overflow': true }; + const overflowAccumulation = this._activeCollectionStorage.getOrDefault( + overflowAttributes, + () => this._aggregator.createAccumulation(collectionTime) + ); + overflowAccumulation?.record(value); + return; + } + + accumulation = this._aggregator.createAccumulation(collectionTime); + this._activeCollectionStorage.set(attributes, accumulation); } - const accumulation = this._activeCollectionStorage.getOrDefault( - attributes, - () => this._aggregator.createAccumulation(collectionTime) - ); accumulation?.record(value); } @@ -81,6 +84,19 @@ export class DeltaMetricProcessor> { hashCode )!; delta = this._aggregator.diff(previous, accumulation); + } else { + // If the cardinality limit is reached, we need to change the attributes + if (this._cumulativeMemoStorage.size >= this._cardinalityLimit) { + attributes = { 'otel.metric.overflow': true }; + hashCode = hashAttributes(attributes); + if (this._cumulativeMemoStorage.has(attributes, hashCode)) { + const previous = this._cumulativeMemoStorage.get( + attributes, + hashCode + )!; + delta = this._aggregator.diff(previous, accumulation); + } + } } // Merge with uncollected active delta. if (this._activeCollectionStorage.has(attributes, hashCode)) { @@ -107,6 +123,7 @@ export class DeltaMetricProcessor> { collect() { const unreportedDelta = this._activeCollectionStorage; this._activeCollectionStorage = new AttributeHashMap(); + return unreportedDelta; } } diff --git a/packages/sdk-metrics/src/state/MeterSharedState.ts b/packages/sdk-metrics/src/state/MeterSharedState.ts index 3737e3c403f..028a43634e5 100644 --- a/packages/sdk-metrics/src/state/MeterSharedState.ts +++ b/packages/sdk-metrics/src/state/MeterSharedState.ts @@ -163,12 +163,17 @@ export class MeterSharedState { if (compatibleStorage != null) { return compatibleStorage; } + const aggregator = aggregation.createAggregator(descriptor); + const cardinalityLimit = collector.selectCardinalityLimit( + descriptor.type + ); const storage = new MetricStorageType( descriptor, aggregator, AttributesProcessor.Noop(), - [collector] + [collector], + cardinalityLimit ) as R; this.metricStorageRegistry.registerForCollector(collector, storage); return storage; diff --git a/packages/sdk-metrics/src/state/MetricCollector.ts b/packages/sdk-metrics/src/state/MetricCollector.ts index f1f1dacdb13..523525377d2 100644 --- a/packages/sdk-metrics/src/state/MetricCollector.ts +++ b/packages/sdk-metrics/src/state/MetricCollector.ts @@ -90,6 +90,14 @@ export class MetricCollector implements MetricProducer { selectAggregation(instrumentType: InstrumentType) { return this._metricReader.selectAggregation(instrumentType); } + + /** + * Select the cardinality limit for the given {@link InstrumentType} for this + * collector. + */ + selectCardinalityLimit(instrumentType: InstrumentType): number { + return this._metricReader.selectCardinalityLimit(instrumentType); + } } /** @@ -98,4 +106,5 @@ export class MetricCollector implements MetricProducer { */ export interface MetricCollectorHandle { selectAggregationTemporality: AggregationTemporalitySelector; + selectCardinalityLimit(instrumentType: InstrumentType): number; } diff --git a/packages/sdk-metrics/src/view/View.ts b/packages/sdk-metrics/src/view/View.ts index 9c8bac9dc43..9a8e7cc60f1 100644 --- a/packages/sdk-metrics/src/view/View.ts +++ b/packages/sdk-metrics/src/view/View.ts @@ -171,6 +171,10 @@ export class View { * Alters the metric stream: * If provided, the attributes that are not in the list will be ignored. * If not provided, all attribute keys will be used by default. + * @param viewOptions.aggregationCardinalityLimit + * Alters the metric stream: + * Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated. + * If not provided, the default limit of 2000 will be used. * @param viewOptions.aggregation * Alters the metric stream: * Alters the {@link Aggregation} of the metric stream. @@ -192,10 +196,6 @@ export class View { * @param viewOptions.meterSchemaUrl * Instrument selection criteria: * The schema URL of the Meter. No wildcard support, schema URL must match exactly. - * @param viewOptions.aggregationCardinalityLimit - * Alters the metric stream: - * Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated. - * If not provided, the default limit of 2000 will be used. * * @example * // Create a view that changes the Instrument 'my.instrument' to use to an diff --git a/packages/sdk-metrics/test/MeterProvider.test.ts b/packages/sdk-metrics/test/MeterProvider.test.ts index 539cc7e8de2..cc534adcb1d 100644 --- a/packages/sdk-metrics/test/MeterProvider.test.ts +++ b/packages/sdk-metrics/test/MeterProvider.test.ts @@ -624,6 +624,54 @@ describe('MeterProvider', () => { counter.add(1, { attr1: 'value1' }); counter.add(1, { attr2: 'value2' }); counter.add(1, { attr3: 'value3' }); + counter.add(1, { attr1: 'value1' }); + + // Perform collection + const { resourceMetrics, errors } = await reader.collect(); + + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + + const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; + assert.strictEqual(metricData.dataPoints.length, 2); + + // Check if the overflow data point is present + const overflowDataPoint = ( + metricData.dataPoints as DataPoint[] + ).find((dataPoint: DataPoint) => + Object.prototype.hasOwnProperty.call( + dataPoint.attributes, + 'otel.metric.overflow' + ) + ); + assert.ok(overflowDataPoint); + assert.strictEqual(overflowDataPoint.value, 2); + }); + + it('should respect the aggregationCardinalityLimit for observable counter', async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ + resource: defaultResource, + readers: [reader], + views: [ + new View({ + instrumentName: 'test-observable-counter', + aggregationCardinalityLimit: 2, // Set cardinality limit to 2 + }), + ], + }); + + const meter = meterProvider.getMeter('meter1', 'v1.0.0'); + const observableCounter = meter.createObservableCounter( + 'test-observable-counter' + ); + observableCounter.addCallback(observableResult => { + observableResult.observe(1, { attr1: 'value1' }); + observableResult.observe(2, { attr2: 'value2' }); + observableResult.observe(3, { attr3: 'value3' }); + observableResult.observe(4, { attr1: 'value1' }); + }); // Perform collection const { resourceMetrics, errors } = await reader.collect(); @@ -633,7 +681,7 @@ describe('MeterProvider', () => { assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; - assert.strictEqual(metricData.dataPoints.length, 3); + assert.strictEqual(metricData.dataPoints.length, 2); // Check if the overflow data point is present const overflowDataPoint = ( @@ -645,7 +693,50 @@ describe('MeterProvider', () => { ) ); assert.ok(overflowDataPoint); - assert.strictEqual(overflowDataPoint.value, 1); + assert.strictEqual(overflowDataPoint.value, 3); + }); + }); + + describe('aggregationCardinalityLimit via MetricReader should apply the cardinality limit', () => { + it('should respect the aggregationCardinalityLimit set via MetricReader', async () => { + const reader = new TestMetricReader({ + cardinalitySelector: (instrumentType: InstrumentType) => 2, // Set cardinality limit to 2 via cardinalitySelector + }); + const meterProvider = new MeterProvider({ + resource: defaultResource, + readers: [reader], + }); + + const meter = meterProvider.getMeter('meter1', 'v1.0.0'); + const counter = meter.createCounter('test-counter'); + + // Add values with different attributes + counter.add(1, { attr1: 'value1' }); + counter.add(1, { attr2: 'value2' }); + counter.add(1, { attr3: 'value3' }); + counter.add(1, { attr1: 'value1' }); + + // Perform collection + const { resourceMetrics, errors } = await reader.collect(); + + assert.strictEqual(errors.length, 0); + assert.strictEqual(resourceMetrics.scopeMetrics.length, 1); + assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); + + const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; + assert.strictEqual(metricData.dataPoints.length, 2); + + // Check if the overflow data point is present + const overflowDataPoint = ( + metricData.dataPoints as DataPoint[] + ).find((dataPoint: DataPoint) => + Object.prototype.hasOwnProperty.call( + dataPoint.attributes, + 'otel.metric.overflow' + ) + ); + assert.ok(overflowDataPoint); + assert.strictEqual(overflowDataPoint.value, 2); }); }); @@ -661,7 +752,7 @@ describe('MeterProvider', () => { const counter = meter.createCounter('test-counter'); // Add values with different attributes - for (let i = 0; i < 2002; i++) { + for (let i = 0; i < 2001; i++) { const attributes = { [`attr${i}`]: `value${i}` }; counter.add(1, attributes); } @@ -674,7 +765,7 @@ describe('MeterProvider', () => { assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1); const metricData = resourceMetrics.scopeMetrics[0].metrics[0]; - assert.strictEqual(metricData.dataPoints.length, 2001); + assert.strictEqual(metricData.dataPoints.length, 2000); // Check if the overflow data point is present const overflowDataPoint = ( diff --git a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts index b4a5df19238..e940c71c18c 100644 --- a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts @@ -34,10 +34,12 @@ import { HrTime } from '@opentelemetry/api'; const deltaCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, + selectCardinalityLimit: () => 2000, }; const cumulativeCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, + selectCardinalityLimit: () => 2000, }; describe('AsyncMetricStorage', () => { diff --git a/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts b/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts index ec0a3d6fff3..a14f89df53a 100644 --- a/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/DeltaMetricProcessor.test.ts @@ -120,6 +120,35 @@ describe('DeltaMetricProcessor', () => { const accumulation = accumulations.get({}); assert.strictEqual(accumulation?.toPointValue(), 20); }); + + it('should respect the cardinality limit', () => { + const cardinalityLimit = 2; + const metricProcessor = new DeltaMetricProcessor( + new SumAggregator(true), + cardinalityLimit + ); + + { + const measurements = new AttributeHashMap(); + measurements.set({ attribute: '1' }, 10); + measurements.set({ attribute: '2' }, 20); + measurements.set({ attribute: '3' }, 30); + metricProcessor.batchCumulate(measurements, [0, 0]); + } + + const accumulations = metricProcessor.collect(); + assert.strictEqual(accumulations.size, 2); + { + const accumulation = accumulations.get({ attribute: '1' }); + assert.strictEqual(accumulation?.toPointValue(), 10); + } + { + const overflowAccumulation = accumulations.get({ + 'otel.metric.overflow': true, + }); + assert.strictEqual(overflowAccumulation?.toPointValue(), 30); + } + }); }); describe('collect', () => { diff --git a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts index 8a1513e351c..35e3faa1fa9 100644 --- a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts +++ b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts @@ -58,11 +58,17 @@ describe('MetricStorageRegistry', () => { selectAggregationTemporality: () => { throw new Error('should not be invoked'); }, + selectCardinalityLimit: () => { + throw new Error('should not be invoked'); + }, }; const collectorHandle2: MetricCollectorHandle = { selectAggregationTemporality: () => { throw new Error('should not be invoked'); }, + selectCardinalityLimit: () => { + throw new Error('should not be invoked'); + }, }; describe('register', () => { diff --git a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts index e2e0378a454..e12a291a9d5 100644 --- a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts @@ -33,10 +33,12 @@ import { const deltaCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, + selectCardinalityLimit: () => 2000, }; const cumulativeCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, + selectCardinalityLimit: () => 2000, }; describe('SyncMetricStorage', () => { diff --git a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts index 77edc36b17f..932d45c5f15 100644 --- a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts @@ -31,14 +31,17 @@ import { const deltaCollector1: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, + selectCardinalityLimit: () => 2000, }; const deltaCollector2: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, + selectCardinalityLimit: () => 2000, }; const cumulativeCollector1: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, + selectCardinalityLimit: () => 2000, }; describe('TemporalMetricProcessor', () => {