diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java index 5cd9fea40f8..4ebf23a9deb 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java @@ -17,6 +17,21 @@ public enum MemoryMode { * *

In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing * concurrent collections / exports. + * + *

Metric Signal: For DELTA aggregation temporality, the memory used for recording and + * aggregating metric values is kept between MetricReader collect operation, to avoid memory + * allocations. When the configured maximum cardinality of Attributes is reached, unused + * Attributes are cleared from memory during collect operation, at the cost of requiring new + * memory allocations the next time those attributes are used. Allocations can be minimized by + * increasing the configured max cardinality. For example, suppose instrumentation has recorded + * values for 1000 unique Attributes while the max cardinality configured was 2000. If after a + * collection only 100 unique Attributes values are recorded, the MetricReader's collect operation + * would return 100 points, while in memory the Attributes data structure keeps 1000 unique + * Attributes. If a user recorded values for 3000 unique attributes, the values for the first 1999 + * Attributes would be recorded, and the rest of 1001 unique Attributes values would be recorded + * in the CARDINALITY_OVERFLOW Attributes. If after several collect operations, the user now + * records values to only 500 unique attributes, during collect operation, the unused 1500 + * Attributes memory would be cleared from memory. */ REUSABLE_DATA, @@ -25,6 +40,9 @@ public enum MemoryMode { * *

In this mode, the SDK passes immutable objects to exporters / readers, increasing * allocations but ensuring safe concurrent exports. + * + *

Metric Signal: In DELTA aggregation temporality, the memory used for recording and + * aggregating Attributes values is cleared during a MetricReader collect operation. */ IMMUTABLE_DATA } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java index b1f60e35b94..26996df4bdf 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor.setIncludes; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; @@ -96,6 +97,7 @@ public ViewBuilder setAttributeFilter(Predicate keyFilter) { *

Note: not currently stable but additional attribute processors can be configured via {@link * SdkMeterProviderUtil#appendAllBaggageAttributes(ViewBuilder)}. */ + @SuppressWarnings("unused") ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) { this.processor = this.processor.then(attributesProcessor); return this; @@ -105,7 +107,10 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) { * Set the cardinality limit. * *

Note: not currently stable but cardinality limit can be configured via - * SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}. + * SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int). + * + *

Read {@link MemoryMode} to understand the memory usage behavior of reaching cardinality + * limit. * * @param cardinalityLimit the maximum number of series for a metric */ diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java index 6e3b3f06f53..2bf7c8c4db6 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java @@ -28,6 +28,7 @@ public abstract class AggregatorHandle exemplarReservoir; + private volatile boolean valuesRecorded = false; protected AggregatorHandle(ExemplarReservoir exemplarReservoir) { this.exemplarReservoir = exemplarReservoir; @@ -39,6 +40,10 @@ protected AggregatorHandle(ExemplarReservoir exemplarReservoir) { */ public final T aggregateThenMaybeReset( long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) { + if (reset) { + valuesRecorded = false; + } + return doAggregateThenMaybeReset( startEpochNanos, epochNanos, @@ -69,6 +74,7 @@ public final void recordLong(long value, Attributes attributes, Context context) */ public final void recordLong(long value) { doRecordLong(value); + valuesRecorded = true; } /** @@ -94,6 +100,7 @@ public final void recordDouble(double value, Attributes attributes, Context cont */ public final void recordDouble(double value) { doRecordDouble(value); + valuesRecorded = true; } /** @@ -104,4 +111,13 @@ protected void doRecordDouble(double value) { throw new UnsupportedOperationException( "This aggregator does not support recording double values."); } + + /** + * Checks whether this handle has values recorded. + * + * @return True if values has been recorded to it + */ + public boolean hasRecordedValues() { + return valuesRecorded; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index cf31f90cb24..191223bfeac 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -5,9 +5,14 @@ package io.opentelemetry.sdk.metrics.internal.state; +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.ExemplarData; @@ -50,6 +55,16 @@ public final class DefaultSynchronousMetricStorage aggregatorHolder = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; + private final MemoryMode memoryMode; + + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + + // Only populated if memoryMode == REUSABLE_DATA and + // aggregationTemporality is DELTA + private volatile ConcurrentHashMap> + previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); + /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot * to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series. @@ -74,6 +89,7 @@ public final class DefaultSynchronousMetricStorage getHolderForRecord() { /** * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate - * that recording is complete and it is safe to collect. + * that recording is complete, and it is safe to collect. */ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { aggregatorHolder.activeRecordingThreads.addAndGet(-2); @@ -185,16 +201,20 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - boolean reset = aggregationTemporality == AggregationTemporality.DELTA; + boolean reset = aggregationTemporality == DELTA; long start = - aggregationTemporality == AggregationTemporality.DELTA + aggregationTemporality == DELTA ? registeredReader.getLastCollectEpochNanos() : startEpochNanos; ConcurrentHashMap> aggregatorHandles; if (reset) { AggregatorHolder holder = this.aggregatorHolder; - this.aggregatorHolder = new AggregatorHolder<>(); + this.aggregatorHolder = + (memoryMode == REUSABLE_DATA) + ? new AggregatorHolder<>(previousCollectionAggregatorHandles) + : new AggregatorHolder<>(); + // Increment recordsInProgress by 1, which produces an odd number acting as a signal that // record operations should re-read the volatile this.aggregatorHolder. // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record @@ -208,15 +228,56 @@ public MetricData collect( aggregatorHandles = this.aggregatorHolder.aggregatorHandles; } + List points; + if (memoryMode == REUSABLE_DATA) { + reusableResultList.clear(); + points = reusableResultList; + } else { + points = new ArrayList<>(aggregatorHandles.size()); + } + + // In DELTA aggregation temporality each Attributes is reset to 0 + // every time we perform a collection (by definition of DELTA). + // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles + // (into which the values are recorded) effectively starting from 0 + // for each recorded Attributes. + // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing + // a key-value from a map and putting it again on next recording will cost an allocation, + // we are keeping the aggregator handles in their map, and only reset their value once + // we finish collecting the aggregated value from each one. + // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory, + // hence during collect(), when the map is at full capacity, we try to clear away unused + // aggregator handles, so on next recording cycle using this map, there will be room for newly + // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided + // if the user chooses to increase the maxCardinality. + if (memoryMode == REUSABLE_DATA && reset) { + if (aggregatorHandles.size() >= maxCardinality) { + aggregatorHandles.forEach( + (attribute, handle) -> { + if (!handle.hasRecordedValues()) { + aggregatorHandles.remove(attribute); + } + }); + } + } + // Grab aggregated points. - List points = new ArrayList<>(aggregatorHandles.size()); aggregatorHandles.forEach( (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset); - if (reset) { + + if (reset && memoryMode == IMMUTABLE_DATA) { // Return the aggregator to the pool. + // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is + // always used as it is the place accumulating the values and never resets) + // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid + // using the pool since it allocates memory internally on each put() or remove() aggregatorHandlePool.offer(handle); } + if (point != null) { points.add(point); } @@ -229,6 +290,10 @@ public MetricData collect( aggregatorHandlePool.poll(); } + if (reset && memoryMode == REUSABLE_DATA) { + previousCollectionAggregatorHandles = aggregatorHandles; + } + if (points.isEmpty()) { return EmptyMetricData.getInstance(); } @@ -243,8 +308,7 @@ public MetricDescriptor getMetricDescriptor() { } private static class AggregatorHolder { - private final ConcurrentHashMap> aggregatorHandles = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap> aggregatorHandles; // Recording threads grab the current interval (AggregatorHolder) and atomically increment // this by 2 before recording against it (and then decrement by two when done). // @@ -260,5 +324,14 @@ private static class AggregatorHolder(); + } + + private AggregatorHolder( + ConcurrentHashMap> aggregatorHandles) { + this.aggregatorHandles = aggregatorHandles; + } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index f808a0e85f1..2d548acc169 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -20,9 +20,11 @@ import io.opentelemetry.context.Context; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; @@ -39,6 +41,7 @@ import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; @@ -47,10 +50,12 @@ import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; import java.util.stream.Stream; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.event.Level; @@ -75,19 +80,36 @@ public class SynchronousMetricStorageTest { LogCapturer logs = LogCapturer.create().captureForType(DefaultSynchronousMetricStorage.class, Level.DEBUG); - private final RegisteredReader deltaReader = - RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()); - private final RegisteredReader cumulativeReader = - RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()); + private RegisteredReader deltaReader; + private RegisteredReader cumulativeReader; private final TestClock testClock = TestClock.create(); - private final Aggregator aggregator = - spy( - ((AggregatorFactory) Aggregation.sum()) - .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff())); + private Aggregator aggregator; private final AttributesProcessor attributesProcessor = AttributesProcessor.noop(); - @Test - void recordDouble_NaN() { + private void initialize(MemoryMode memoryMode) { + deltaReader = + RegisteredReader.create( + InMemoryMetricReader.builder() + .setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA) + .setMemoryMode(memoryMode) + .build(), + ViewRegistry.create()); + + cumulativeReader = + RegisteredReader.create( + InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(), + ViewRegistry.create()); + + aggregator = + spy( + ((AggregatorFactory) Aggregation.sum()) + .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff())); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordDouble_NaN(MemoryMode memoryMode) { + initialize(memoryMode); DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, @@ -105,8 +127,11 @@ void recordDouble_NaN() { .isEqualTo(EmptyMetricData.getInstance()); } - @Test - void attributesProcessor_applied() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void attributesProcessor_applied(MemoryMode memoryMode) { + initialize(memoryMode); + Attributes attributes = Attributes.builder().put("K", "V").build(); AttributesProcessor attributesProcessor = AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build()); @@ -129,8 +154,11 @@ void attributesProcessor_applied() { attributeEntry("K", "V"), attributeEntry("modifiedK", "modifiedV")))); } - @Test - void recordAndCollect_CumulativeDoesNotReset() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) { + initialize(memoryMode); + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, @@ -176,7 +204,9 @@ void recordAndCollect_CumulativeDoesNotReset() { } @Test - void recordAndCollect_DeltaResets() { + void recordAndCollect_DeltaResets_ImmutableData() { + initialize(MemoryMode.IMMUTABLE_DATA); + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); @@ -223,7 +253,107 @@ void recordAndCollect_DeltaResets() { } @Test - void recordAndCollect_CumulativeAtLimit() { + void recordAndCollect_DeltaResets_ReusableData() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + // Record measurement and collect at time 10 + storage.recordDouble(3, Attributes.empty(), Context.current()); + verify(aggregator, times(1)).createHandle(); + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3))); + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + + deltaReader.setLastCollectEpochNanos(10); + + // Record measurement and collect at time 30 + storage.recordDouble(3, Attributes.empty(), Context.current()); + + // We're switched to secondary map so a handle will be created + verify(aggregator, times(2)).createHandle(); + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) + .hasDoubleSumSatisfying( + sum -> + sum.isDelta() + .hasPointsSatisfying( + point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + + deltaReader.setLastCollectEpochNanos(30); + + // Record measurements and collect at time 35 + storage.recordDouble(2, Attributes.empty(), Context.current()); + storage.recordDouble(4, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current()); + + // We don't delete aggregator handles unless max cardinality reached, hence + // aggregator handle is still there, thus no handle was created for empty(), but it will for + // the "foo" + verify(aggregator, times(3)).createHandle(); + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35); + assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta); + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(2) + .anySatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(30); + assertThat(point.getEpochNanos()).isEqualTo(35); + assertThat(point.getValue()).isEqualTo(2); + assertThat(point.getAttributes()).isEqualTo(Attributes.empty()); + }) + .anySatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(30); + assertThat(point.getEpochNanos()).isEqualTo(35); + assertThat(point.getValue()).isEqualTo(4); + assertThat(point.getAttributes()) + .isEqualTo( + Attributes.of(AttributeKey.stringKey("foo"), "bar")); + }))); + + assertThat(storage.getAggregatorHandlePool()).hasSize(0); + + deltaReader.setLastCollectEpochNanos(40); + storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current()); + + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 45)) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(1) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(40); + assertThat(point.getEpochNanos()).isEqualTo(45); + assertThat(point.getValue()).isEqualTo(6); + assertThat(point.getAttributes()) + .isEqualTo( + Attributes.of(AttributeKey.stringKey("foo"), "bar")); + }))); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) { + initialize(memoryMode); + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( cumulativeReader, @@ -292,7 +422,9 @@ void recordAndCollect_CumulativeAtLimit() { } @Test - void recordAndCollect_DeltaAtLimit() { + void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { + initialize(MemoryMode.IMMUTABLE_DATA); + DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); @@ -319,6 +451,7 @@ void recordAndCollect_DeltaAtLimit() { assertThat(point.getValue()).isEqualTo(3); }))); assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1); + assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(10); @@ -384,6 +517,227 @@ void recordAndCollect_DeltaAtLimit() { logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } + @Test + void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + // Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow + // series + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(); + + // First collect + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + Assertions.assertThat(sumData.getPoints()) + .hasSize(CARDINALITY_LIMIT - 1) + .allSatisfy( + point -> { + Assertions.assertThat(point.getStartEpochNanos()).isEqualTo(0); + Assertions.assertThat(point.getEpochNanos()).isEqualTo(10); + Assertions.assertThat(point.getValue()).isEqualTo(3); + }))); + + assertThat(logs.getEvents()).isEmpty(); + + deltaReader.setLastCollectEpochNanos(10); + + // Record CARDINALITY_LIMIT measurements, causing one measurement to exceed the cardinality + // limit and fall into the overflow series + for (int i = 0; i < CARDINALITY_LIMIT; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + // After first collection, we expect the secondary map which is empty to be used, + // hence handle creation will still take place + // The +1 is for the overflow handle + verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle(); + + // Second collect + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20); + + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(CARDINALITY_LIMIT) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(10); + assertThat(point.getEpochNanos()).isEqualTo(20); + assertThat(point.getValue()).isEqualTo(3); + }) + .noneMatch( + point -> + ("value" + CARDINALITY_LIMIT) + .equals( + point + .getAttributes() + .get(AttributeKey.stringKey("key")))) + .satisfiesOnlyOnce( + point -> + assertThat(point.getAttributes()) + .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); + + assertThat(storage.getAggregatorHandlePool()).isEmpty(); + + logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); + } + + @Test + void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + // 1st recording: Recording goes to active map + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + // This will switch next recordings to the secondary map (which is empty) + // by making it the active map + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + + // 2nd recording + deltaReader.setLastCollectEpochNanos(10); + for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + // This switches maps again, so next recordings will be to the first map + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10, 20); + + // 3rd recording: We're recording unseen attributes to a map we know is full, + // since it was filled during 1st recording + deltaReader.setLastCollectEpochNanos(20); + for (int i = CARDINALITY_LIMIT - 1; i < (CARDINALITY_LIMIT - 1) + 15; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20, 30); + + assertOnlyOverflowWasRecorded(metricData, 20, 30, 15 * 3); + + // 4th recording: We're recording unseen attributes to a map we know is full, + // since it was filled during *2nd* recording + deltaReader.setLastCollectEpochNanos(30); + for (int i = CARDINALITY_LIMIT - 1; i < (CARDINALITY_LIMIT - 1) + 15; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30, 40); + + assertOnlyOverflowWasRecorded(metricData, 30, 40, 15 * 3); + + // 5th recording: Map should be empty, since all handlers were removed due to + // no recording being done to them + deltaReader.setLastCollectEpochNanos(40); + for (int i = 0; i < 10; i++) { + storage.recordDouble( + 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40, 50); + + assertNumberOfPoints(metricData, 10); + assertAllPointsWithValue(metricData, 40, 50, 3); + assertOverflowDoesNotExists(metricData); + + // 6th recording: Map should be empty (we switched to secondary map), since all handlers + // were removed due to no recordings being done to them + deltaReader.setLastCollectEpochNanos(50); + for (int i = 0; i < 12; i++) { + storage.recordDouble( + 4, Attributes.builder().put("key", "value" + i).build(), Context.current()); + } + + metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50, 60); + + assertNumberOfPoints(metricData, 12); + assertAllPointsWithValue(metricData, 50, 60, 4); + assertOverflowDoesNotExists(metricData); + } + + @SuppressWarnings("SameParameterValue") + private static void assertOnlyOverflowWasRecorded( + MetricData metricData, long startTime, long endTime, double value) { + + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .hasSize(1) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(startTime); + assertThat(point.getEpochNanos()).isEqualTo(endTime); + assertThat(point.getValue()).isEqualTo(value); + assertThat(point.getAttributes()) + .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW); + }))); + } + + private static void assertNumberOfPoints(MetricData metricData, int numberOfPoints) { + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies(sumData -> assertThat(sumData.getPoints()).hasSize(numberOfPoints))); + } + + private static void assertAllPointsWithValue( + MetricData metricData, long startTime, long endTime, double value) { + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .allSatisfy( + point -> { + assertThat(point.getStartEpochNanos()).isEqualTo(startTime); + assertThat(point.getEpochNanos()).isEqualTo(endTime); + assertThat(point.getValue()).isEqualTo(value); + }))); + } + + private static void assertOverflowDoesNotExists(MetricData metricData) { + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()) + .noneMatch( + point -> + point + .getAttributes() + .equals(MetricStorage.CARDINALITY_OVERFLOW)))); + } + @ParameterizedTest @MethodSource("concurrentStressTestArguments") void recordAndCollect_concurrentStressTest( @@ -439,29 +793,45 @@ void recordAndCollect_concurrentStressTest( } private static Stream concurrentStressTestArguments() { - Aggregator aggregator = - ((AggregatorFactory) Aggregation.sum()) - .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); - return Stream.of( - Arguments.of( - // Delta - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.addAndGet(value)), - Arguments.of( - // Cumulative - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.set(value))); + List argumentsList = new ArrayList<>(); + + for (MemoryMode memoryMode : MemoryMode.values()) { + Aggregator aggregator = + ((AggregatorFactory) Aggregation.sum()) + .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); + + argumentsList.add( + Arguments.of( + // Delta + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create( + InMemoryMetricReader.builder() + .setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA) + .setMemoryMode(memoryMode) + .build(), + ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.addAndGet(value))); + + argumentsList.add( + Arguments.of( + // Cumulative + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create( + InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(), + ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.set(value))); + } + + return argumentsList.stream(); } }