Skip to content

Commit

Permalink
feat(sdk-metrics): add CardinalitySelector to MetricReader
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Nov 11, 2024
1 parent d133c01 commit cc6235c
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 24 deletions.
21 changes: 21 additions & 0 deletions packages/sdk-metrics/src/export/CardinalitySelector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { InstrumentType } from '../InstrumentDescriptor';
/**
* Cardinality Limit selector based on metric instrument types.
*/
export type CardinalitySelector = (instrumentType: InstrumentType) => number;
18 changes: 18 additions & 0 deletions packages/sdk-metrics/src/export/MetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
} from './AggregationSelector';
import { CardinalitySelector } from './CardinalitySelector';

export interface MetricReaderOptions {
/**
Expand All @@ -45,6 +46,11 @@ export interface MetricReaderOptions {
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
/**
* Cardinality selector based on metric instrument types. If not configured,
* a default value is used.
*/
cardinalitySelector?: CardinalitySelector;
/**
* **Note, this option is experimental**. Additional MetricProducers to use as a source of
* aggregated metric data in addition to the SDK's metric data. The resource returned by
Expand All @@ -68,6 +74,7 @@ export abstract class MetricReader {
private _sdkMetricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;
private readonly _cardinalitySelector?: CardinalitySelector;

constructor(options?: MetricReaderOptions) {
this._aggregationSelector =
Expand All @@ -76,6 +83,7 @@ export abstract class MetricReader {
options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
this._metricProducers = options?.metricProducers ?? [];
this._cardinalitySelector = options?.cardinalitySelector;
}

/**
Expand Down Expand Up @@ -116,6 +124,16 @@ export abstract class MetricReader {
return this._aggregationTemporalitySelector(instrumentType);
}

/**
* Select the cardinality limit for the given {@link InstrumentType} for this
* reader.
*/
selectCardinalityLimit(instrumentType: InstrumentType): number {
return this._cardinalitySelector
? this._cardinalitySelector(instrumentType)
: 2000; // default value if no selector is provided
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
* Overriding this method is optional.
Expand Down
8 changes: 6 additions & 2 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
collectorHandles: MetricCollectorHandle[],
private _aggregationCardinalityLimit?: number
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._deltaMetricStorage = new DeltaMetricProcessor(
aggregator,
this._aggregationCardinalityLimit
);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
Expand Down
43 changes: 30 additions & 13 deletions packages/sdk-metrics/src/state/DeltaMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { Context, HrTime, Attributes } from '@opentelemetry/api';
import { Maybe } from '../utils';
import { Maybe, hashAttributes } from '../utils';
import { Accumulation, Aggregator } from '../aggregator/types';
import { AttributeHashMap } from './HashMap';

Expand Down Expand Up @@ -45,20 +45,23 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
_context: Context,
collectionTime: HrTime
) {
if (this._activeCollectionStorage.size >= this._cardinalityLimit) {
const overflowAttributes = { 'otel.metric.overflow': true };
const overflowAccumulation = this._activeCollectionStorage.getOrDefault(
overflowAttributes,
() => this._aggregator.createAccumulation(collectionTime)
);
overflowAccumulation?.record(value);
return;
let accumulation = this._activeCollectionStorage.get(attributes);

if (!accumulation) {
if (this._activeCollectionStorage.size >= this._cardinalityLimit) {
const overflowAttributes = { 'otel.metric.overflow': true };
const overflowAccumulation = this._activeCollectionStorage.getOrDefault(
overflowAttributes,
() => this._aggregator.createAccumulation(collectionTime)
);
overflowAccumulation?.record(value);
return;
}

accumulation = this._aggregator.createAccumulation(collectionTime);
this._activeCollectionStorage.set(attributes, accumulation);
}

const accumulation = this._activeCollectionStorage.getOrDefault(
attributes,
() => this._aggregator.createAccumulation(collectionTime)
);
accumulation?.record(value);
}

Expand All @@ -81,6 +84,19 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
hashCode
)!;
delta = this._aggregator.diff(previous, accumulation);
} else {
// If the cardinality limit is reached, we need to change the attributes
if (this._cumulativeMemoStorage.size >= this._cardinalityLimit) {
attributes = { 'otel.metric.overflow': true };
hashCode = hashAttributes(attributes);
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
const previous = this._cumulativeMemoStorage.get(
attributes,
hashCode
)!;
delta = this._aggregator.diff(previous, accumulation);
}
}
}
// Merge with uncollected active delta.
if (this._activeCollectionStorage.has(attributes, hashCode)) {
Expand All @@ -107,6 +123,7 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
collect() {
const unreportedDelta = this._activeCollectionStorage;
this._activeCollectionStorage = new AttributeHashMap();

return unreportedDelta;
}
}
7 changes: 6 additions & 1 deletion packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,17 @@ export class MeterSharedState {
if (compatibleStorage != null) {
return compatibleStorage;
}

const aggregator = aggregation.createAggregator(descriptor);
const cardinalityLimit = collector.selectCardinalityLimit(
descriptor.type
);
const storage = new MetricStorageType(
descriptor,
aggregator,
AttributesProcessor.Noop(),
[collector]
[collector],
cardinalityLimit
) as R;
this.metricStorageRegistry.registerForCollector(collector, storage);
return storage;
Expand Down
9 changes: 9 additions & 0 deletions packages/sdk-metrics/src/state/MetricCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ export class MetricCollector implements MetricProducer {
selectAggregation(instrumentType: InstrumentType) {
return this._metricReader.selectAggregation(instrumentType);
}

/**
* Select the cardinality limit for the given {@link InstrumentType} for this
* collector.
*/
selectCardinalityLimit(instrumentType: InstrumentType): number {
return this._metricReader.selectCardinalityLimit(instrumentType);
}
}

/**
Expand All @@ -98,4 +106,5 @@ export class MetricCollector implements MetricProducer {
*/
export interface MetricCollectorHandle {
selectAggregationTemporality: AggregationTemporalitySelector;
selectCardinalityLimit(instrumentType: InstrumentType): number;
}
8 changes: 4 additions & 4 deletions packages/sdk-metrics/src/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ export class View {
* Alters the metric stream:
* If provided, the attributes that are not in the list will be ignored.
* If not provided, all attribute keys will be used by default.
* @param viewOptions.aggregationCardinalityLimit
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit of 2000 will be used.
* @param viewOptions.aggregation
* Alters the metric stream:
* Alters the {@link Aggregation} of the metric stream.
Expand All @@ -192,10 +196,6 @@ export class View {
* @param viewOptions.meterSchemaUrl
* Instrument selection criteria:
* The schema URL of the Meter. No wildcard support, schema URL must match exactly.
* @param viewOptions.aggregationCardinalityLimit
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit of 2000 will be used.
*
* @example
* // Create a view that changes the Instrument 'my.instrument' to use to an
Expand Down
99 changes: 95 additions & 4 deletions packages/sdk-metrics/test/MeterProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,54 @@ describe('MeterProvider', () => {
counter.add(1, { attr1: 'value1' });
counter.add(1, { attr2: 'value2' });
counter.add(1, { attr3: 'value3' });
counter.add(1, { attr1: 'value1' });

// Perform collection
const { resourceMetrics, errors } = await reader.collect();

assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 2);

// Check if the overflow data point is present
const overflowDataPoint = (
metricData.dataPoints as DataPoint<number>[]
).find((dataPoint: DataPoint<number>) =>
Object.prototype.hasOwnProperty.call(
dataPoint.attributes,
'otel.metric.overflow'
)
);
assert.ok(overflowDataPoint);
assert.strictEqual(overflowDataPoint.value, 2);
});

it('should respect the aggregationCardinalityLimit for observable counter', async () => {
const reader = new TestMetricReader();
const meterProvider = new MeterProvider({
resource: defaultResource,
readers: [reader],
views: [
new View({
instrumentName: 'test-observable-counter',
aggregationCardinalityLimit: 2, // Set cardinality limit to 2
}),
],
});

const meter = meterProvider.getMeter('meter1', 'v1.0.0');
const observableCounter = meter.createObservableCounter(
'test-observable-counter'
);
observableCounter.addCallback(observableResult => {
observableResult.observe(1, { attr1: 'value1' });
observableResult.observe(2, { attr2: 'value2' });
observableResult.observe(3, { attr3: 'value3' });
observableResult.observe(4, { attr1: 'value1' });
});

// Perform collection
const { resourceMetrics, errors } = await reader.collect();
Expand All @@ -572,7 +620,7 @@ describe('MeterProvider', () => {
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 3);
assert.strictEqual(metricData.dataPoints.length, 2);

// Check if the overflow data point is present
const overflowDataPoint = (
Expand All @@ -584,7 +632,50 @@ describe('MeterProvider', () => {
)
);
assert.ok(overflowDataPoint);
assert.strictEqual(overflowDataPoint.value, 1);
assert.strictEqual(overflowDataPoint.value, 3);
});
});

describe('aggregationCardinalityLimit via MetricReader should apply the cardinality limit', () => {
it('should respect the aggregationCardinalityLimit set via MetricReader', async () => {
const reader = new TestMetricReader({
cardinalitySelector: (instrumentType: InstrumentType) => 2, // Set cardinality limit to 2 via cardinalitySelector
});
const meterProvider = new MeterProvider({
resource: defaultResource,
readers: [reader],
});

const meter = meterProvider.getMeter('meter1', 'v1.0.0');
const counter = meter.createCounter('test-counter');

// Add values with different attributes
counter.add(1, { attr1: 'value1' });
counter.add(1, { attr2: 'value2' });
counter.add(1, { attr3: 'value3' });
counter.add(1, { attr1: 'value1' });

// Perform collection
const { resourceMetrics, errors } = await reader.collect();

assert.strictEqual(errors.length, 0);
assert.strictEqual(resourceMetrics.scopeMetrics.length, 1);
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 2);

// Check if the overflow data point is present
const overflowDataPoint = (
metricData.dataPoints as DataPoint<number>[]
).find((dataPoint: DataPoint<number>) =>
Object.prototype.hasOwnProperty.call(
dataPoint.attributes,
'otel.metric.overflow'
)
);
assert.ok(overflowDataPoint);
assert.strictEqual(overflowDataPoint.value, 2);
});
});

Expand All @@ -600,7 +691,7 @@ describe('MeterProvider', () => {
const counter = meter.createCounter('test-counter');

// Add values with different attributes
for (let i = 0; i < 2002; i++) {
for (let i = 0; i < 2001; i++) {
const attributes = { [`attr${i}`]: `value${i}` };
counter.add(1, attributes);
}
Expand All @@ -613,7 +704,7 @@ describe('MeterProvider', () => {
assert.strictEqual(resourceMetrics.scopeMetrics[0].metrics.length, 1);

const metricData = resourceMetrics.scopeMetrics[0].metrics[0];
assert.strictEqual(metricData.dataPoints.length, 2001);
assert.strictEqual(metricData.dataPoints.length, 2000);

// Check if the overflow data point is present
const overflowDataPoint = (
Expand Down
2 changes: 2 additions & 0 deletions packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import { HrTime } from '@opentelemetry/api';

const deltaCollector: MetricCollectorHandle = {
selectAggregationTemporality: () => AggregationTemporality.DELTA,
selectCardinalityLimit: () => 2000,
};

const cumulativeCollector: MetricCollectorHandle = {
selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE,
selectCardinalityLimit: () => 2000,
};

describe('AsyncMetricStorage', () => {
Expand Down
Loading

0 comments on commit cc6235c

Please sign in to comment.