diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8171d76655b..b599a32fb3b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,7 +8,7 @@ requirements and recommendations. If you want to add new features or change behavior, please make sure your changes follow the [OpenTelemetry Specification](https://github.com/open-telemetry/opentelemetry-specification). -Otherwise file an issue or submit a PR to the specification repo first. +Otherwise, file an issue or submit a PR to the specification repo first. Make sure to review the projects [license](LICENSE) and sign the [CNCF CLA](https://identity.linuxfoundation.org/projects/cncf). A signed CLA will be enforced by an @@ -52,7 +52,8 @@ $ ./gradlew check Note: this gradle task will potentially generate changes to files in the `docs/apidiffs/current_vs_latest` -directory. Please make sure to include any changes to these files in your pull request. +directory. Please make sure to include any changes to these files in your pull request (i.e. +add those files to your commits in the PR). ## PR Review diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index 8c27fb4232e..46aa12a1343 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -45,6 +45,7 @@ val DEPENDENCIES = listOf( "io.opencensus:opencensus-contrib-exemplar-util:${opencensusVersion}", "org.openjdk.jmh:jmh-core:${jmhVersion}", "org.openjdk.jmh:jmh-generator-bytecode:${jmhVersion}", + "org.openjdk.jmh:jmh-generator-annprocess:${jmhVersion}", "org.mockito:mockito-core:${mockitoVersion}", "org.mockito:mockito-junit-jupiter:${mockitoVersion}", "org.slf4j:slf4j-simple:${slf4jVersion}", diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt index df26146497b..cc95503822e 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt @@ -1,2 +1,11 @@ Comparing source compatibility of against -No changes. \ No newline at end of file ++++ NEW ENUM: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode (compatible) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW INTERFACE: java.lang.constant.Constable + +++ NEW INTERFACE: java.lang.Comparable + +++ NEW INTERFACE: java.io.Serializable + +++ NEW SUPERCLASS: java.lang.Enum + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode REUSABLE_DATA + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode IMMUTABLE_DATA + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.common.export.MemoryMode valueOf(java.lang.String) + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.common.export.MemoryMode[] values() diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt index df26146497b..5753f2fee9b 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt @@ -1,2 +1,10 @@ Comparing source compatibility of against -No changes. \ No newline at end of file +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricExporter (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode() +*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricReader (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode() +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReader (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode() diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt index df26146497b..24ed11618d8 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt @@ -1,2 +1,12 @@ Comparing source compatibility of against -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder builder() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode() ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader build() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setAggregationTemporalitySelector(io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setDefaultAggregationSelector(io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setMemoryMode(io.opentelemetry.sdk.common.export.MemoryMode) diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java new file mode 100644 index 00000000000..24c87d13de6 --- /dev/null +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/MemoryMode.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.common.export; + +/** The memory semantics of the SDK. */ +public enum MemoryMode { + + /** + * Reuses objects to reduce allocations. + * + *

In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing + * concurrent collections / exports. + */ + REUSABLE_DATA, + + /** + * Uses immutable data structures. + * + *

In this mode, the SDK passes immutable objects to exporters / readers, increasing + * allocations but ensuring safe concurrent exports. + */ + IMMUTABLE_DATA +} diff --git a/sdk/metrics/build.gradle.kts b/sdk/metrics/build.gradle.kts index 326f1750a61..010291e3d72 100644 --- a/sdk/metrics/build.gradle.kts +++ b/sdk/metrics/build.gradle.kts @@ -41,6 +41,13 @@ testing { } } } + register("jmhBasedTest") { + dependencies { + implementation("org.openjdk.jmh:jmh-core") + implementation("org.openjdk.jmh:jmh-generator-bytecode") + annotationProcessor("org.openjdk.jmh:jmh-generator-annprocess") + } + } } } diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmark.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmark.java new file mode 100644 index 00000000000..b1d845773dd --- /dev/null +++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmark.java @@ -0,0 +1,128 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Run this through {@link AsynchronousMetricStorageGarbageCollectionBenchmarkTest}, as it runs it + * embedded with the GC profiler which what this test designed for (No need for command line run) + * + *

This test creates 10 asynchronous counters (any asynchronous instrument will do as the code + * path is almost the same for all async instrument types), and 1000 attribute sets. Each time the + * test runs, it calls `flush` which effectively calls the callback for each counter. Each such + * callback records a random number for each of the 1000 attribute sets. The result list ends up in + * {@link NoopMetricExporter} which does nothing with it. + * + *

This is repeated 100 times, collectively called Operation in the statistics and each such + * operation is repeated 20 times - known as Iteration. + * + *

Each such test is repeated, with a brand new JVM, for all combinations of {@link MemoryMode} + * and {@link AggregationTemporality}. This is done since each combination has a different code + * path. + */ +@BenchmarkMode(Mode.SingleShotTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Measurement(iterations = 20, batchSize = 100) +@Warmup(iterations = 10, batchSize = 10) +@Fork(1) +public class AsynchronousMetricStorageGarbageCollectionBenchmark { + + @State(value = Scope.Benchmark) + @SuppressWarnings("SystemOut") + public static class ThreadState { + private final int cardinality; + private final int countersCount; + @Param public AggregationTemporality aggregationTemporality; + @Param public MemoryMode memoryMode; + SdkMeterProvider sdkMeterProvider; + private final Random random = new Random(); + List attributesList; + + /** Creates a ThreadState. */ + @SuppressWarnings("unused") + public ThreadState() { + cardinality = 1000; + countersCount = 10; + } + + @SuppressWarnings("SpellCheckingInspection") + @Setup + public void setup() { + PeriodicMetricReader metricReader = + PeriodicMetricReader.builder( + // Configure an exporter that configures the temporality and aggregation + // for the test case, but otherwise drops the data on export + new NoopMetricExporter(aggregationTemporality, Aggregation.sum(), memoryMode)) + // Effectively disable periodic reading so reading is only done on #flush() + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .build(); + SdkMeterProviderBuilder builder = SdkMeterProvider.builder(); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + builder, metricReader, unused -> cardinality + 1); + + attributesList = AttributesGenerator.generate(cardinality); + + // Disable examplars + SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff()); + + sdkMeterProvider = builder.build(); + for (int i = 0; i < countersCount; i++) { + sdkMeterProvider + .get("meter") + .counterBuilder("counter" + i) + .buildWithCallback( + observableLongMeasurement -> { + for (int j = 0; j < attributesList.size(); j++) { + Attributes attributes = attributesList.get(j); + observableLongMeasurement.record(random.nextInt(10_000), attributes); + } + }); + } + } + + @TearDown + public void tearDown() { + sdkMeterProvider.shutdown().join(10, TimeUnit.SECONDS); + } + } + + /** + * Collects all asynchronous instruments metric data. + * + * @param threadState thread-state + */ + @Benchmark + @Threads(value = 1) + public void recordAndCollect(ThreadState threadState) { + threadState.sdkMeterProvider.forceFlush().join(10, TimeUnit.SECONDS); + } +} diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmarkTest.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmarkTest.java new file mode 100644 index 00000000000..a5b5f5cc5d2 --- /dev/null +++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageGarbageCollectionBenchmarkTest.java @@ -0,0 +1,106 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.openjdk.jmh.infra.BenchmarkParams; +import org.openjdk.jmh.results.BenchmarkResult; +import org.openjdk.jmh.results.Result; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +public class AsynchronousMetricStorageGarbageCollectionBenchmarkTest { + + /** + * This test validates that in {@link MemoryMode#REUSABLE_DATA}, {@link + * AsynchronousMetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely + * allocates memory which is then subsequently garbage collected. It is done so comparatively to + * {@link MemoryMode#IMMUTABLE_DATA}, + * + *

It runs the JMH test {@link AsynchronousMetricStorageGarbageCollectionBenchmark} with GC + * profiler, and measures for each parameter combination the garbage collector normalized rate + * (bytes allocated per Operation). + * + *

Memory allocations can be hidden even at an innocent foreach loop on a collection, which + * under the hood allocates an internal object O(N) times. Someone can accidentally refactor such + * loop, resulting in 30% increase of garbage collected objects during a single collect() run. + */ + @SuppressWarnings("rawtypes") + @Test + public void normalizedAllocationRateTest() throws RunnerException { + // GitHub CI has an environment variable (CI=true). We can use it to skip + // this test since it's a lengthy one (roughly 10 seconds) and have it running + // only in GitHub CI + Assumptions.assumeTrue( + "true".equals(System.getenv("CI")), + "This test should only run in GitHub CI since it's long"); + + // Runs AsynchronousMetricStorageMemoryProfilingBenchmark + // with garbage collection profiler + Options opt = + new OptionsBuilder() + .include(AsynchronousMetricStorageGarbageCollectionBenchmark.class.getSimpleName()) + .addProfiler("gc") + .shouldFailOnError(true) + .jvmArgs("-Xmx1500m") + .build(); + Collection results = new Runner(opt).run(); + + // Collect the normalized GC allocation rate per parameters combination + Map> resultMap = new HashMap<>(); + for (RunResult result : results) { + for (BenchmarkResult benchmarkResult : result.getBenchmarkResults()) { + BenchmarkParams benchmarkParams = benchmarkResult.getParams(); + + String memoryMode = benchmarkParams.getParam("memoryMode"); + String aggregationTemporality = benchmarkParams.getParam("aggregationTemporality"); + assertThat(memoryMode).isNotNull(); + assertThat(aggregationTemporality).isNotNull(); + + Map secondaryResults = benchmarkResult.getSecondaryResults(); + Result allocRateNorm = secondaryResults.get("gc.alloc.rate.norm"); + assertThat(allocRateNorm) + .describedAs("Allocation rate in secondary results: %s", secondaryResults) + .isNotNull(); + + resultMap + .computeIfAbsent(aggregationTemporality, k -> new HashMap<>()) + .put(memoryMode, allocRateNorm.getScore()); + } + } + + assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values()); + + // Asserts that reusable data GC allocation rate is a tiny fraction of immutable data + // GC allocation rate + resultMap.forEach( + (aggregationTemporality, memoryModeToAllocRateMap) -> { + Double immutableDataAllocRate = + memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString()); + Double reusableDataAllocRate = + memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString()); + + assertThat(immutableDataAllocRate).isNotNull().isNotZero(); + assertThat(reusableDataAllocRate).isNotNull().isNotZero(); + assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100) + .isCloseTo(99.8, Offset.offset(2.0)); + }); + } +} diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AttributesGenerator.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AttributesGenerator.java new file mode 100644 index 00000000000..3aea1dbd6d6 --- /dev/null +++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/AttributesGenerator.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.api.common.Attributes; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; + +public class AttributesGenerator { + + private AttributesGenerator() {} + + /** + * Generates a list of unique attributes, with a single attribute key, and random value. + * + * @param uniqueAttributesCount The amount of unique attribute sets to generate + * @return The list of generates {@link Attributes} + */ + public static List generate(int uniqueAttributesCount) { + Random random = new Random(); + HashSet attributeSet = new HashSet<>(); + ArrayList attributesList = new ArrayList<>(uniqueAttributesCount); + String last = "aaaaaaaaaaaaaaaaaaaaaaaaaa"; + for (int i = 0; i < uniqueAttributesCount; i++) { + char[] chars = last.toCharArray(); + int attempts = 0; + do { + chars[random.nextInt(last.length())] = (char) (random.nextInt(26) + 'a'); + } while (attributeSet.contains(new String(chars)) && ++attempts < 1000); + if (attributeSet.contains(new String(chars))) { + throw new IllegalStateException("Couldn't create new random attributes"); + } + last = new String(chars); + attributesList.add(Attributes.builder().put("key", last).build()); + attributeSet.add(last); + } + + return attributesList; + } +} diff --git a/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/NoopMetricExporter.java b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/NoopMetricExporter.java new file mode 100644 index 00000000000..533a3cc34dc --- /dev/null +++ b/sdk/metrics/src/jmhBasedTest/java/io/opentelemetry/sdk/metrics/internal/state/NoopMetricExporter.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.Collection; + +public class NoopMetricExporter implements MetricExporter { + private final AggregationTemporality aggregationTemporality; + private final Aggregation aggregation; + private final MemoryMode memoryMode; + + /** + * Create a {@link NoopMetricExporter} with aggregationTemporality, aggregation and memory mode. + */ + public NoopMetricExporter( + AggregationTemporality aggregationTemporality, + Aggregation aggregation, + MemoryMode memoryMode) { + this.aggregationTemporality = aggregationTemporality; + this.aggregation = aggregation; + this.memoryMode = memoryMode; + } + + @Override + public CompletableResultCode export(Collection metrics) { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public Aggregation getDefaultAggregation(InstrumentType instrumentType) { + return aggregation; + } + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return aggregationTemporality; + } + + @Override + public MemoryMode getMemoryMode() { + return memoryMode; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoublePointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoublePointData.java index bcb90a3d96a..fefbd27f547 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoublePointData.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoublePointData.java @@ -6,14 +6,12 @@ package io.opentelemetry.sdk.metrics.data; import java.util.List; -import javax.annotation.concurrent.Immutable; /** * Point data with a {@code double} aggregation value. * * @since 1.14.0 */ -@Immutable public interface DoublePointData extends PointData { /** Returns the value of the data point. */ double getValue(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/LongPointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/LongPointData.java index 8cf3129119b..18b42cb9d06 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/LongPointData.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/LongPointData.java @@ -6,14 +6,12 @@ package io.opentelemetry.sdk.metrics.data; import java.util.List; -import javax.annotation.concurrent.Immutable; /** * A point data with a {@code double} aggregation value. * * @since 1.14.0 */ -@Immutable public interface LongPointData extends PointData { /** Returns the value of the data point. */ long getValue(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/PointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/PointData.java index bb4684cfa6e..4e19c603e99 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/PointData.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/PointData.java @@ -7,7 +7,6 @@ import io.opentelemetry.api.common.Attributes; import java.util.List; -import javax.annotation.concurrent.Immutable; /** * A point in the metric data model. @@ -17,7 +16,6 @@ * * @since 1.14.0 */ -@Immutable public interface PointData { /** Returns the start time of the aggregation in epoch nanos. */ long getStartEpochNanos(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java index af9f5f4747c..f4f8f569055 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java @@ -5,7 +5,10 @@ package io.opentelemetry.sdk.metrics.export; +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; + import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -38,6 +41,16 @@ default Aggregation getDefaultAggregation(InstrumentType instrumentType) { return Aggregation.defaultAggregation(); } + /** + * Returns the memory mode used by this exporter's associated reader. + * + * @return The {@link MemoryMode} used by this exporter's associated reader + * @since 1.29.0 + */ + default MemoryMode getMemoryMode() { + return IMMUTABLE_DATA; + } + /** * Exports the {@code metrics}. The caller (i.e. {@link PeriodicMetricReader} will not call export * until the previous call completes. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricReader.java index 3752ba2640c..0b0fb974d78 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricReader.java @@ -5,7 +5,10 @@ package io.opentelemetry.sdk.metrics.export; +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; + import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -44,6 +47,16 @@ default Aggregation getDefaultAggregation(InstrumentType instrumentType) { return Aggregation.defaultAggregation(); } + /** + * Returns the memory mode used by this reader. + * + * @return The {@link MemoryMode} used by this instance + * @since 1.29.0 + */ + default MemoryMode getMemoryMode() { + return IMMUTABLE_DATA; + } + /** * Read and export the metrics. * diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index a03e10e35d2..d22133677fb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.export; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -74,6 +75,11 @@ public Aggregation getDefaultAggregation(InstrumentType instrumentType) { return exporter.getDefaultAggregation(instrumentType); } + @Override + public MemoryMode getMemoryMode() { + return exporter.getMemoryMode(); + } + @Override public CompletableResultCode forceFlush() { return scheduled.doRun(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index 863aff87300..e44eb2de33e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -53,6 +53,21 @@ default T diff(T previousCumulative, T currentCumulative) { throw new UnsupportedOperationException("This aggregator does not support diff."); } + /** + * Resets one reusable point to be a DELTA point by computing the difference between two + * cumulative points. + * + *

The delta between the two points is set on {@code previousCumulativeReusable} + * + *

Aggregators MUST implement diff if it can be used with asynchronous instruments. + * + * @param previousCumulativeReusable the previously captured point. + * @param currentCumulative the newly captured (cumulative) point. + */ + default void diffInPlace(T previousCumulativeReusable, T currentCumulative) { + throw new UnsupportedOperationException("This aggregator does not support diffInPlace."); + } + /** * Return a new point representing the measurement. * @@ -62,6 +77,26 @@ default T toPoint(Measurement measurement) { throw new UnsupportedOperationException("This aggregator does not support toPoint."); } + /** + * Resets {@code reusablePoint} to represent the {@code measurement}. + * + *

Aggregators MUST implement diff if it can be used with asynchronous instruments. + */ + default void toPoint(Measurement measurement, T reusablePoint) { + throw new UnsupportedOperationException("This aggregator does not support toPoint."); + } + + /** Creates a new reusable point. */ + default T createReusablePoint() { + throw new UnsupportedOperationException( + "This aggregator does not support createReusablePoint."); + } + + /** Copies {@code point} into {@code toReusablePoint}. */ + default void copyPoint(T point, T toReusablePoint) { + throw new UnsupportedOperationException("This aggregator does not support toPoint."); + } + /** * Returns the {@link MetricData} that this {@code Aggregation} will produce. * diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java index a56df5860ce..8f0a622e3af 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregator.java @@ -14,6 +14,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.metrics.internal.state.Measurement; @@ -57,6 +58,11 @@ public DoublePointData diff(DoublePointData previous, DoublePointData current) { return current; } + @Override + public void diffInPlace(DoublePointData previousReusable, DoublePointData current) { + ((MutableDoublePointData) previousReusable).set(current); + } + @Override public DoublePointData toPoint(Measurement measurement) { return ImmutableDoublePointData.create( @@ -66,6 +72,26 @@ public DoublePointData toPoint(Measurement measurement) { measurement.doubleValue()); } + @Override + public void toPoint(Measurement measurement, DoublePointData reusablePoint) { + ((MutableDoublePointData) reusablePoint) + .set( + measurement.startEpochNanos(), + measurement.epochNanos(), + measurement.attributes(), + measurement.doubleValue()); + } + + @Override + public DoublePointData createReusablePoint() { + return new MutableDoublePointData(); + } + + @Override + public void copyPoint(DoublePointData point, DoublePointData toReusablePoint) { + ((MutableDoublePointData) toReusablePoint).set(point); + } + @Override public MetricData toMetricData( Resource resource, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java index 42189ac3d58..c88217114c7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregator.java @@ -16,6 +16,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; @@ -64,6 +65,17 @@ public DoublePointData diff(DoublePointData previousPoint, DoublePointData curre currentPoint.getExemplars()); } + @Override + public void diffInPlace(DoublePointData previousReusablePoint, DoublePointData currentPoint) { + ((MutableDoublePointData) previousReusablePoint) + .set( + currentPoint.getStartEpochNanos(), + currentPoint.getEpochNanos(), + currentPoint.getAttributes(), + currentPoint.getValue() - previousReusablePoint.getValue(), + currentPoint.getExemplars()); + } + @Override public DoublePointData toPoint(Measurement measurement) { return ImmutableDoublePointData.create( @@ -73,6 +85,26 @@ public DoublePointData toPoint(Measurement measurement) { measurement.doubleValue()); } + @Override + public void toPoint(Measurement measurement, DoublePointData reusablePoint) { + ((MutableDoublePointData) reusablePoint) + .set( + measurement.startEpochNanos(), + measurement.epochNanos(), + measurement.attributes(), + measurement.doubleValue()); + } + + @Override + public DoublePointData createReusablePoint() { + return new MutableDoublePointData(); + } + + @Override + public void copyPoint(DoublePointData point, DoublePointData toReusablePoint) { + ((MutableDoublePointData) toReusablePoint).set(point); + } + @Override public MetricData toMetricData( Resource resource, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java index e1c310207d6..5b3064822c9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregator.java @@ -14,6 +14,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.metrics.internal.state.Measurement; @@ -53,6 +54,11 @@ public LongPointData diff(LongPointData previous, LongPointData current) { return current; } + @Override + public void diffInPlace(LongPointData previousReusablePoint, LongPointData currentPoint) { + ((MutableLongPointData) previousReusablePoint).set(currentPoint); + } + @Override public LongPointData toPoint(Measurement measurement) { return ImmutableLongPointData.create( @@ -62,6 +68,26 @@ public LongPointData toPoint(Measurement measurement) { measurement.longValue()); } + @Override + public void toPoint(Measurement measurement, LongPointData reusablePoint) { + ((MutableLongPointData) reusablePoint) + .set( + measurement.startEpochNanos(), + measurement.epochNanos(), + measurement.attributes(), + measurement.longValue()); + } + + @Override + public LongPointData createReusablePoint() { + return new MutableLongPointData(); + } + + @Override + public void copyPoint(LongPointData point, LongPointData toReusablePoint) { + ((MutableLongPointData) toReusablePoint).set(point); + } + @Override public MetricData toMetricData( Resource resource, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java index 3899476591a..131c82ce3e9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregator.java @@ -16,6 +16,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; @@ -58,6 +59,17 @@ public LongPointData diff(LongPointData previousPoint, LongPointData currentPoin currentPoint.getExemplars()); } + @Override + public void diffInPlace(LongPointData previousReusablePoint, LongPointData currentPoint) { + ((MutableLongPointData) previousReusablePoint) + .set( + currentPoint.getStartEpochNanos(), + currentPoint.getEpochNanos(), + currentPoint.getAttributes(), + currentPoint.getValue() - previousReusablePoint.getValue(), + currentPoint.getExemplars()); + } + @Override public LongPointData toPoint(Measurement measurement) { return ImmutableLongPointData.create( @@ -67,6 +79,26 @@ public LongPointData toPoint(Measurement measurement) { measurement.longValue()); } + @Override + public void toPoint(Measurement measurement, LongPointData reusablePoint) { + ((MutableLongPointData) reusablePoint) + .set( + measurement.startEpochNanos(), + measurement.epochNanos(), + measurement.attributes(), + measurement.longValue()); + } + + @Override + public LongPointData createReusablePoint() { + return new MutableLongPointData(); + } + + @Override + public void copyPoint(LongPointData point, LongPointData toReusablePoint) { + ((MutableLongPointData) toReusablePoint).set(point); + } + @Override public MetricData toMetricData( Resource resource, diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableDoublePointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableDoublePointData.java new file mode 100644 index 00000000000..fb412e6f2a8 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableDoublePointData.java @@ -0,0 +1,138 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.data; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * A mutable {@link DoublePointData} + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + */ +public class MutableDoublePointData implements DoublePointData { + + private long startEpochNanos; + private long epochNanos; + + private Attributes attributes = Attributes.empty(); + + private double value; + private List exemplars = Collections.emptyList(); + + @Override + public double getValue() { + return value; + } + + @Override + public long getStartEpochNanos() { + return startEpochNanos; + } + + @Override + public long getEpochNanos() { + return epochNanos; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public List getExemplars() { + return exemplars; + } + + /** + * Sets all {@link MutableDoublePointData} values based on {@code point}. + * + * @param point The point to take the values from + */ + public void set(DoublePointData point) { + set( + point.getStartEpochNanos(), + point.getEpochNanos(), + point.getAttributes(), + point.getValue(), + point.getExemplars()); + } + + /** Sets all {@link MutableDoublePointData} values , besides exemplars which are set to empty. */ + public void set(long startEpochNanos, long epochNanos, Attributes attributes, double value) { + set(startEpochNanos, epochNanos, attributes, value, Collections.emptyList()); + } + + /** Sets all {@link MutableDoublePointData} values. */ + public void set( + long startEpochNanos, + long epochNanos, + Attributes attributes, + double value, + List exemplars) { + this.startEpochNanos = startEpochNanos; + this.epochNanos = epochNanos; + this.attributes = attributes; + this.value = value; + this.exemplars = exemplars; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof MutableDoublePointData)) { + return false; + } + MutableDoublePointData pointData = (MutableDoublePointData) o; + return startEpochNanos == pointData.startEpochNanos + && epochNanos == pointData.epochNanos + && Double.doubleToLongBits(value) == Double.doubleToLongBits(pointData.value) + && Objects.equals(attributes, pointData.attributes) + && Objects.equals(exemplars, pointData.exemplars); + } + + @Override + public int hashCode() { + int hashcode = 1; + hashcode *= 1000003; + hashcode ^= (int) ((startEpochNanos >>> 32) ^ startEpochNanos); + hashcode *= 1000003; + hashcode ^= (int) ((epochNanos >>> 32) ^ epochNanos); + hashcode *= 1000003; + hashcode ^= attributes.hashCode(); + hashcode *= 1000003; + hashcode ^= (int) ((Double.doubleToLongBits(value) >>> 32) ^ Double.doubleToLongBits(value)); + hashcode *= 1000003; + hashcode ^= exemplars.hashCode(); + return hashcode; + } + + @Override + public String toString() { + return "MutableDoublePointData{" + + "startEpochNanos=" + + startEpochNanos + + ", epochNanos=" + + epochNanos + + ", attributes=" + + attributes + + ", value=" + + value + + ", exemplars=" + + exemplars + + '}'; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableLongPointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableLongPointData.java new file mode 100644 index 00000000000..179accaf486 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/data/MutableLongPointData.java @@ -0,0 +1,136 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.data; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Mutable {@link LongPointData} + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + */ +public class MutableLongPointData implements LongPointData { + + private long value; + private long startEpochNanos; + private long epochNanos; + private Attributes attributes = Attributes.empty(); + private List exemplars = Collections.emptyList(); + + @Override + public long getValue() { + return value; + } + + @Override + public long getStartEpochNanos() { + return startEpochNanos; + } + + @Override + public long getEpochNanos() { + return epochNanos; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public List getExemplars() { + return exemplars; + } + + /** + * Sets all {@link MutableDoublePointData} based on {@code point}. + * + * @param point The point to set values upon + */ + public void set(LongPointData point) { + set( + point.getStartEpochNanos(), + point.getEpochNanos(), + point.getAttributes(), + point.getValue(), + point.getExemplars()); + } + + /** Sets all {@link MutableDoublePointData} values besides exemplars which are set to be empty. */ + public void set(long startEpochNanos, long epochNanos, Attributes attributes, long value) { + set(startEpochNanos, epochNanos, attributes, value, Collections.emptyList()); + } + + /** Sets all {@link MutableDoublePointData} values. */ + public void set( + long startEpochNanos, + long epochNanos, + Attributes attributes, + long value, + List exemplars) { + this.startEpochNanos = startEpochNanos; + this.epochNanos = epochNanos; + this.attributes = attributes; + this.value = value; + this.exemplars = exemplars; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof MutableLongPointData)) { + return false; + } + MutableLongPointData that = (MutableLongPointData) o; + return value == that.value + && startEpochNanos == that.startEpochNanos + && epochNanos == that.epochNanos + && Objects.equals(attributes, that.attributes) + && Objects.equals(exemplars, that.exemplars); + } + + @Override + public int hashCode() { + int hashcode = 1; + hashcode *= 1000003; + hashcode ^= (int) ((startEpochNanos >>> 32) ^ startEpochNanos); + hashcode *= 1000003; + hashcode ^= (int) ((epochNanos >>> 32) ^ epochNanos); + hashcode *= 1000003; + hashcode ^= attributes.hashCode(); + hashcode *= 1000003; + hashcode ^= (int) ((value >>> 32) ^ value); + hashcode *= 1000003; + hashcode ^= exemplars.hashCode(); + return hashcode; + } + + @Override + public String toString() { + return "MutableLongPointData{" + + "value=" + + value + + ", startEpochNanos=" + + startEpochNanos + + ", epochNanos=" + + epochNanos + + ", attributes=" + + attributes + + ", exemplars=" + + exemplars + + '}'; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/MetricProducer.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/MetricProducer.java index 32e036b9d10..63bb152f638 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/MetricProducer.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/MetricProducer.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.metrics.internal.export; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.CollectionRegistration; import io.opentelemetry.sdk.metrics.export.MetricReader; @@ -43,6 +44,10 @@ static MetricProducer noop() { * Returns a collection of produced {@link MetricData}s to be exported. This will only be those * metrics that have been produced since the last time this method was called. * + *

If {@link MetricReader#getMemoryMode()} is configured to {@link MemoryMode#REUSABLE_DATA} do + * not keep the result or any of its contained objects as they are to be reused to return the + * result for the next call of {@code collectAllMetrics} + * * @return a collection of produced {@link MetricData}s to be exported. */ Collection collectAllMetrics(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStack.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStack.java new file mode 100644 index 00000000000..627955b3403 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStack.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import javax.annotation.Nullable; + +/** + * Array-based Stack. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + */ +public class ArrayBasedStack { + static final int DEFAULT_CAPACITY = 10; + + // NOTE (asafm): Using native array instead of ArrayList since I plan to add eviction + // if the initial portion of the stack is not used for several cycles of collection + private T[] array; + + private int size; + + @SuppressWarnings("unchecked") + public ArrayBasedStack() { + array = (T[]) new Object[DEFAULT_CAPACITY]; + size = 0; + } + + /** + * Add {@code element} to the top of the stack (LIFO). + * + * @param element The element to add + * @throws NullPointerException if {@code element} is null + */ + public void push(T element) { + if (element == null) { + throw new NullPointerException("Null is not permitted as element in the stack"); + } + if (size == array.length) { + resizeArray(array.length * 2); + } + array[size++] = element; + } + + /** + * Removes and returns an element from the top of the stack (LIFO). + * + * @return the top most element in the stack (last one added) + */ + @Nullable + public T pop() { + if (isEmpty()) { + return null; + } + T element = array[size - 1]; + array[size - 1] = null; + size--; + return element; + } + + public boolean isEmpty() { + return size == 0; + } + + public int size() { + return size; + } + + @SuppressWarnings("unchecked") + private void resizeArray(int newCapacity) { + T[] newArray = (T[]) new Object[newCapacity]; + System.arraycopy(array, 0, newArray, 0, size); + array = newArray; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 2e088541d10..4da3d653207 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -5,11 +5,14 @@ package io.opentelemetry.sdk.metrics.internal.state; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -25,6 +28,8 @@ import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.metrics.internal.view.RegisteredView; import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; @@ -53,9 +58,18 @@ final class AsynchronousMetricStorage points = new HashMap<>(); - private Map lastPoints = - new HashMap<>(); // Only populated if aggregationTemporality == DELTA + private Map points; + + // Only populated if aggregationTemporality == DELTA + private Map lastPoints; + + // Only populated if memoryMode == REUSABLE_DATA + private final ObjectPool reusablePointsPool; + + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + + private final MemoryMode memoryMode; private AsynchronousMetricStorage( RegisteredReader registeredReader, @@ -69,9 +83,18 @@ private AsynchronousMetricStorage( registeredReader .getReader() .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); + this.memoryMode = registeredReader.getReader().getMemoryMode(); this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; + this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint); + if (memoryMode == REUSABLE_DATA) { + lastPoints = new PooledHashMap<>(); + points = new PooledHashMap<>(); + } else { + lastPoints = new HashMap<>(); + points = new HashMap<>(); + } } /** @@ -107,12 +130,9 @@ void record(Measurement measurement) { aggregationTemporality == AggregationTemporality.DELTA ? registeredReader.getLastCollectEpochNanos() : measurement.startEpochNanos(); - measurement = - measurement.hasDoubleValue() - ? Measurement.doubleMeasurement( - start, measurement.epochNanos(), measurement.doubleValue(), processedAttributes) - : Measurement.longMeasurement( - start, measurement.epochNanos(), measurement.longValue(), processedAttributes); + + measurement = measurement.withAttributes(processedAttributes).withStartEpochNanos(start); + recordPoint(processedAttributes, measurement); } @@ -126,18 +146,7 @@ private void recordPoint(Attributes attributes, Measurement measurement) { + maxCardinality + ")."); attributes = MetricStorage.CARDINALITY_OVERFLOW; - measurement = - measurement.hasDoubleValue() - ? Measurement.doubleMeasurement( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.doubleValue(), - attributes) - : Measurement.longMeasurement( - measurement.startEpochNanos(), - measurement.epochNanos(), - measurement.longValue(), - attributes); + measurement = measurement.withAttributes(attributes); } else if (points.containsKey( attributes)) { // Check there is not already a recording for the attributes throttlingLogger.log( @@ -149,7 +158,15 @@ private void recordPoint(Attributes attributes, Measurement measurement) { return; } - points.put(attributes, aggregator.toPoint(measurement)); + T dataPoint; + if (memoryMode == REUSABLE_DATA) { + dataPoint = reusablePointsPool.borrowObject(); + aggregator.toPoint(measurement, dataPoint); + } else { + dataPoint = aggregator.toPoint(measurement); + } + + points.put(attributes, dataPoint); } @Override @@ -168,25 +185,76 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - Map result; + if (memoryMode == REUSABLE_DATA) { + // Collect can not run concurrently for same reader, hence we safely assume + // the previous collect result has been used and done with + reusableResultList.forEach(reusablePointsPool::returnObject); + reusableResultList.clear(); + } + + Collection result; if (aggregationTemporality == AggregationTemporality.DELTA) { Map points = this.points; Map lastPoints = this.lastPoints; - lastPoints.entrySet().removeIf(entry -> !points.containsKey(entry.getKey())); + + Collection deltaPoints; + if (memoryMode == REUSABLE_DATA) { + deltaPoints = reusableResultList; + } else { + deltaPoints = new ArrayList<>(); + } + points.forEach( - (k, v) -> lastPoints.compute(k, (k2, v2) -> v2 == null ? v : aggregator.diff(v2, v))); - result = lastPoints; + (k, v) -> { + T lastPoint = lastPoints.get(k); + + T deltaPoint; + if (lastPoint == null) { + if (memoryMode == REUSABLE_DATA) { + deltaPoint = reusablePointsPool.borrowObject(); + aggregator.copyPoint(v, deltaPoint); + } else { + deltaPoint = v; + } + } else { + if (memoryMode == REUSABLE_DATA) { + aggregator.diffInPlace(lastPoint, v); + deltaPoint = lastPoint; + + // Remaining last points are returned to reusablePointsPool, but + // this reusable point is still used, so don't return it to pool yet + lastPoints.remove(k); + } else { + deltaPoint = aggregator.diff(lastPoint, v); + } + } + + deltaPoints.add(deltaPoint); + }); + + if (memoryMode == REUSABLE_DATA) { + lastPoints.forEach((k, v) -> reusablePointsPool.returnObject(v)); + lastPoints.clear(); + this.points = lastPoints; + } else { + this.points = new HashMap<>(); + } + this.lastPoints = points; - } else { - result = points; + result = deltaPoints; + } else /* CUMULATIVE */ { + if (memoryMode == REUSABLE_DATA) { + points.forEach((k, v) -> reusableResultList.add(v)); + points.clear(); + result = reusableResultList; + } else { + result = points.values(); + points = new HashMap<>(); + } } - this.points = new HashMap<>(); + return aggregator.toMetricData( - resource, - instrumentationScopeInfo, - metricDescriptor, - result.values(), - aggregationTemporality); + resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java new file mode 100644 index 00000000000..9cac89e96e9 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ImmutableMeasurement.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; + +/** + * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link + * ObservableDoubleMeasurement}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@AutoValue +public abstract class ImmutableMeasurement implements Measurement { + + static ImmutableMeasurement createDouble( + long startEpochNanos, long epochNanos, double value, Attributes attributes) { + return new AutoValue_ImmutableMeasurement( + startEpochNanos, + epochNanos, + /* hasLongValue= */ false, + 0L, + /* hasDoubleValue= */ true, + value, + attributes); + } + + static ImmutableMeasurement createLong( + long startEpochNanos, long epochNanos, long value, Attributes attributes) { + return new AutoValue_ImmutableMeasurement( + startEpochNanos, + epochNanos, + /* hasLongValue= */ true, + value, + /* hasDoubleValue= */ false, + 0.0, + attributes); + } + + @Override + public Measurement withAttributes(Attributes attributes) { + if (hasDoubleValue()) { + return createDouble(startEpochNanos(), epochNanos(), doubleValue(), attributes); + } else { + return createLong(startEpochNanos(), epochNanos(), longValue(), attributes); + } + } + + @Override + public Measurement withStartEpochNanos(long startEpochNanos) { + if (hasDoubleValue()) { + return createDouble(startEpochNanos, epochNanos(), doubleValue(), attributes()); + } else { + return createLong(startEpochNanos, epochNanos(), longValue(), attributes()); + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java index 43370bb571f..a5023995dad 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/Measurement.java @@ -5,7 +5,6 @@ package io.opentelemetry.sdk.metrics.internal.state; -import com.google.auto.value.AutoValue; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; @@ -13,45 +12,42 @@ /** * A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link * ObservableDoubleMeasurement}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. */ -@AutoValue -public abstract class Measurement { - - static Measurement doubleMeasurement( - long startEpochNanos, long epochNanos, double value, Attributes attributes) { - return new AutoValue_Measurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ false, - 0L, - /* hasDoubleValue= */ true, - value, - attributes); - } - - static Measurement longMeasurement( - long startEpochNanos, long epochNanos, long value, Attributes attributes) { - return new AutoValue_Measurement( - startEpochNanos, - epochNanos, - /* hasLongValue= */ true, - value, - /* hasDoubleValue= */ false, - 0.0, - attributes); - } - - public abstract long startEpochNanos(); - - public abstract long epochNanos(); - - public abstract boolean hasLongValue(); - - public abstract long longValue(); - - public abstract boolean hasDoubleValue(); - - public abstract double doubleValue(); - - public abstract Attributes attributes(); +public interface Measurement { + long startEpochNanos(); + + long epochNanos(); + + boolean hasLongValue(); + + long longValue(); + + boolean hasDoubleValue(); + + double doubleValue(); + + Attributes attributes(); + + /** + * Updates the attributes. + * + * @param attributes The attributes to update + * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the + * updated attributes and for {@link MutableMeasurement} it will return itself with the + * attributes updated + */ + Measurement withAttributes(Attributes attributes); + + /** + * Updates the startEpochNanos. + * + * @param startEpochNanos start epoch nanosecond + * @return The updated object. For {@link ImmutableMeasurement} it will be a new object with the + * updated startEpochNanos and for {@link MutableMeasurement} it will return itself with the + * startEpochNanos updated + */ + Measurement withStartEpochNanos(long startEpochNanos); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java new file mode 100644 index 00000000000..9c88712d487 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MutableMeasurement.java @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.api.common.Attributes; + +/** + * A mutable {@link Measurement} implementation + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + */ +public class MutableMeasurement implements Measurement { + + static void setDoubleMeasurement( + MutableMeasurement mutableMeasurement, + long startEpochNanos, + long epochNanos, + double value, + Attributes attributes) { + mutableMeasurement.set( + startEpochNanos, + epochNanos, + /* hasLongValue= */ false, + 0L, + /* hasDoubleValue= */ true, + value, + attributes); + } + + static void setLongMeasurement( + MutableMeasurement mutableMeasurement, + long startEpochNanos, + long epochNanos, + long value, + Attributes attributes) { + mutableMeasurement.set( + startEpochNanos, + epochNanos, + /* hasLongValue= */ true, + value, + /* hasDoubleValue= */ false, + 0.0, + attributes); + } + + private long startEpochNanos; + private long epochNanos; + private boolean hasLongValue; + private long longValue; + private boolean hasDoubleValue; + private double doubleValue; + + private Attributes attributes = Attributes.empty(); + + /** Sets the values. */ + private void set( + long startEpochNanos, + long epochNanos, + boolean hasLongValue, + long longValue, + boolean hasDoubleValue, + double doubleValue, + Attributes attributes) { + this.startEpochNanos = startEpochNanos; + this.epochNanos = epochNanos; + this.hasLongValue = hasLongValue; + this.longValue = longValue; + this.hasDoubleValue = hasDoubleValue; + this.doubleValue = doubleValue; + this.attributes = attributes; + } + + @Override + public Measurement withStartEpochNanos(long startEpochNanos) { + this.startEpochNanos = startEpochNanos; + return this; + } + + @Override + public Measurement withAttributes(Attributes attributes) { + this.attributes = attributes; + return this; + } + + @Override + public long startEpochNanos() { + return startEpochNanos; + } + + @Override + public long epochNanos() { + return epochNanos; + } + + @Override + public boolean hasLongValue() { + return hasLongValue; + } + + @Override + public long longValue() { + return longValue; + } + + @Override + public boolean hasDoubleValue() { + return hasDoubleValue; + } + + @Override + public double doubleValue() { + return doubleValue; + } + + @Override + public Attributes attributes() { + return attributes; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPool.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPool.java new file mode 100644 index 00000000000..3a129613d87 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPool.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import java.util.function.Supplier; + +/** + * A pool of objects of type {@code T}. + * + *

When an object is borrowed from an empty pool, an object will be created by the supplied + * {@code objectCreator} and returned immediately. When the pool is not empty, an object is removed + * from the pool and returned. The user is expected to return the object to the pool when it is no + * longer used. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + */ +public class ObjectPool { + private final ArrayBasedStack pool; + private final Supplier objectCreator; + + /** + * Constructs an object pool. + * + * @param objectCreator Supplier used to create an object when the pool is empty + */ + public ObjectPool(Supplier objectCreator) { + this.pool = new ArrayBasedStack<>(); + this.objectCreator = objectCreator; + } + + /** + * Gets an object from the pool. + * + * @return An object from the pool, or a new object if the pool is empty + */ + public T borrowObject() { + T object = pool.pop(); + if (object == null) { + object = objectCreator.get(); + } + return object; + } + + public void returnObject(T object) { + pool.push(object); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java new file mode 100644 index 00000000000..452ed9d50ea --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java @@ -0,0 +1,267 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import javax.annotation.Nullable; + +/** + * A bucket-based hash map with an internal re-usable map entry objects pool + * + *

The goal of this map is to minimize memory allocation, leading to reduced time spent in + * garbage collection. + * + *

This map avoids allocating a new map entry on each put operation by maintaining a pool of + * reusable (mutable) map entries and borrowing a map entry object from the pool to hold the given + * key-value of the put operation. The borrowed object is returned to the pool when the map entry + * key is removed from the map. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + *

This class is not thread-safe. + * + * @param The map key type + * @param The map value type + */ +@SuppressWarnings("ForLoopReplaceableByForEach") +public class PooledHashMap implements Map { + private static final int DEFAULT_CAPACITY = 16; + private static final float LOAD_FACTOR = 0.75f; + + private ArrayList>[] table; + private final ObjectPool> entryPool; + private int size; + + /** + * Creates a {@link PooledHashMap} with {@code capacity} buckets. + * + *

The hashmap contains an array of buckets, each is an array-list of items. The number of + * buckets expands over time to avoid having too many items in one bucket, otherwise accessing an + * item by key won't be a constant time complexity. + * + * @param capacity The initial number of buckets to start with + */ + @SuppressWarnings({"unchecked"}) + public PooledHashMap(int capacity) { + this.table = (ArrayList>[]) new ArrayList[capacity]; + this.entryPool = new ObjectPool<>(Entry::new); + this.size = 0; + } + + /** + * Creates a new {@link PooledHashMap} with a default amount of buckets (capacity). + * + * @see PooledHashMap#PooledHashMap(int) + */ + public PooledHashMap() { + this(DEFAULT_CAPACITY); + } + + /** + * Add a key, value pair to the map. + * + *

Internally it uses a MapEntry from a pool of entries, to store this mapping + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return Null if the was no previous mapping for this key, or the value of the previous mapping + * of this key + */ + @Override + @Nullable + public V put(K key, V value) { + requireNonNull(key, "This map does not support null keys"); + requireNonNull(value, "This map does not support null values"); + if (size > LOAD_FACTOR * table.length) { + rehash(); + } + + int bucket = getBucket(key); + ArrayList> entries = table[bucket]; + if (entries == null) { + entries = new ArrayList<>(); + table[bucket] = entries; + } else { + // Don't optimize to enhanced for-loop since implicit iterator used allocated memory in O(n) + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (Objects.equals(entry.key, key)) { + V oldValue = entry.value; + entry.value = value; + return oldValue; + } + } + } + Entry entry = entryPool.borrowObject(); + entry.key = key; + entry.value = value; + entries.add(entry); + size++; + return null; + } + + @SuppressWarnings({"unchecked"}) + private void rehash() { + ArrayList>[] oldTable = table; + table = (ArrayList>[]) new ArrayList[2 * oldTable.length]; + + // put() to new table below will reset size back to correct number + size = 0; + + for (int i = 0; i < oldTable.length; i++) { + ArrayList> bucket = oldTable[i]; + if (bucket != null) { + for (Entry entry : bucket) { + put(requireNonNull(entry.key), requireNonNull(entry.value)); + entryPool.returnObject(entry); + } + bucket.clear(); + } + } + } + + /** + * Retrieves the mapped value for {@code key}. + * + * @param key the key whose associated value is to be returned + * @return The mapped value for {@code key} or null if there is no such mapping + */ + @Override + @Nullable + @SuppressWarnings("unchecked") + public V get(Object key) { + requireNonNull(key, "This map does not support null keys"); + + int bucket = getBucket((K) key); + ArrayList> entries = table[bucket]; + if (entries != null) { + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (Objects.equals(entry.key, key)) { + return entry.value; + } + } + } + return null; + } + + /** + * Removes the mapping for the given {@code key}. + * + * @param key key whose mapping is to be removed from the map + * @return The value mapped to this key, if the mapping exists, or null otherwise + */ + @Override + @Nullable + @SuppressWarnings("unchecked") + public V remove(Object key) { + requireNonNull(key, "This map does not support null keys"); + + int bucket = getBucket((K) key); + ArrayList> entries = table[bucket]; + if (entries != null) { + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (Objects.equals(entry.key, key)) { + V oldValue = entry.value; + entries.remove(i); + entryPool.returnObject(entry); + size--; + return oldValue; + } + } + } + return null; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @Override + public boolean containsKey(Object key) { + requireNonNull(key, "This map does not support null keys"); + + return get(key) != null; + } + + @Override + public boolean containsValue(Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + for (int i = 0; i < table.length; i++) { + ArrayList> bucket = table[i]; + if (bucket != null) { + for (int j = 0; j < bucket.size(); j++) { + Entry entry = bucket.get(j); + entryPool.returnObject(entry); + } + bucket.clear(); + } + } + size = 0; + } + + @Override + public void forEach(BiConsumer action) { + for (int j = 0; j < table.length; j++) { + ArrayList> bucket = table[j]; + if (bucket != null) { + for (int i = 0; i < bucket.size(); i++) { + Entry entry = bucket.get(i); + action.accept(entry.key, entry.value); + } + } + } + } + + private int getBucket(K key) { + return Math.abs(key.hashCode() % table.length); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() { + throw new UnsupportedOperationException(); + } + + private static class Entry { + @Nullable K key; + + @Nullable V value; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java index df2d2b6512c..6d0187b443b 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurement.java @@ -5,17 +5,19 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import java.util.List; +import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -36,6 +38,9 @@ public final class SdkObservableMeasurement private final InstrumentDescriptor instrumentDescriptor; private final List> storages; + /** Only used when {@code activeReader}'s memoryMode is {@link MemoryMode#REUSABLE_DATA}. */ + private final MutableMeasurement mutableMeasurement = new MutableMeasurement(); + // These fields are set before invoking callbacks. They allow measurements to be recorded to the // storages for correct reader, and with the correct time. @Nullable private volatile RegisteredReader activeReader; @@ -104,7 +109,23 @@ public void record(long value) { @Override public void record(long value, Attributes attributes) { - doRecord(longMeasurement(startEpochNanos, epochNanos, value, attributes)); + if (activeReader == null) { + logNoActiveReader(); + return; + } + + Measurement measurement; + + MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); + if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { + measurement = createLong(startEpochNanos, epochNanos, value, attributes); + } else { + MutableMeasurement.setLongMeasurement( + mutableMeasurement, startEpochNanos, epochNanos, value, attributes); + measurement = mutableMeasurement; + } + + doRecord(measurement); } @Override @@ -114,23 +135,38 @@ public void record(double value) { @Override public void record(double value, Attributes attributes) { - doRecord(doubleMeasurement(startEpochNanos, epochNanos, value, attributes)); + if (activeReader == null) { + logNoActiveReader(); + return; + } + + Measurement measurement; + MemoryMode memoryMode = activeReader.getReader().getMemoryMode(); + if (Objects.requireNonNull(memoryMode) == MemoryMode.IMMUTABLE_DATA) { + measurement = createDouble(startEpochNanos, epochNanos, value, attributes); + } else { + MutableMeasurement.setDoubleMeasurement( + mutableMeasurement, startEpochNanos, epochNanos, value, attributes); + measurement = mutableMeasurement; + } + + doRecord(measurement); } private void doRecord(Measurement measurement) { RegisteredReader activeReader = this.activeReader; - if (activeReader == null) { - throttlingLogger.log( - Level.FINE, - "Measurement recorded for instrument " - + instrumentDescriptor.getName() - + " outside callback registered to instrument. Dropping measurement."); - return; - } for (AsynchronousMetricStorage storage : storages) { if (storage.getRegisteredReader().equals(activeReader)) { storage.record(measurement); } } } + + private void logNoActiveReader() { + throttlingLogger.log( + Level.FINE, + "Measurement recorded for instrument " + + instrumentDescriptor.getName() + + " outside callback registered to instrument. Dropping measurement."); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java index 2decaf97821..fa0dd75caa4 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleLastValueAggregatorTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -18,11 +19,13 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import java.util.List; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; /** Unit tests for {@link AggregatorHandle}. */ @@ -113,6 +116,92 @@ void diff() { .isEqualTo(ImmutableDoublePointData.create(0, 1, Attributes.empty(), 2, exemplars)); } + @Test + void diffInPlace() { + Attributes attributes = Attributes.builder().put("test", "value").build(); + DoubleExemplarData exemplar = + ImmutableDoubleExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1); + List exemplars = Collections.singletonList(exemplar); + List previousExemplars = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 1L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + + MutableDoublePointData previous = new MutableDoublePointData(); + MutableDoublePointData current = new MutableDoublePointData(); + + previous.set(0, 1, Attributes.empty(), 1, previousExemplars); + current.set(0, 1, Attributes.empty(), 2, exemplars); + + aggregator.diffInPlace(previous, current); + + /* Assert that latest measurement is kept and set on {@code previous} */ + assertThat(previous.getStartEpochNanos()).isEqualTo(0); + assertThat(previous.getEpochNanos()).isEqualTo(1); + assertThat(previous.getAttributes()).isEqualTo(Attributes.empty()); + assertThat(previous.getValue()).isEqualTo(2); + assertThat(previous.getExemplars()).isEqualTo(exemplars); + } + + @Test + void copyPoint() { + MutableDoublePointData pointData = (MutableDoublePointData) aggregator.createReusablePoint(); + + Attributes attributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsFrom = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1)); + pointData.set(0, 1, attributes, 2000, examplarsFrom); + + MutableDoublePointData toPointData = (MutableDoublePointData) aggregator.createReusablePoint(); + + Attributes toAttributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsTo = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 4L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + toPointData.set(0, 2, toAttributes, 4000, examplarsTo); + + aggregator.copyPoint(pointData, toPointData); + + Assertions.assertThat(toPointData.getStartEpochNanos()) + .isEqualTo(pointData.getStartEpochNanos()); + Assertions.assertThat(toPointData.getEpochNanos()).isEqualTo(pointData.getEpochNanos()); + assertThat(toPointData.getAttributes()).isEqualTo(pointData.getAttributes()); + Assertions.assertThat(toPointData.getValue()).isEqualTo(pointData.getValue()); + Assertions.assertThat(toPointData.getExemplars()).isEqualTo(pointData.getExemplars()); + } + @Test void toMetricData() { AggregatorHandle aggregatorHandle = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java index 15377186150..06651798bf2 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleSumAggregatorTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -21,6 +22,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.MutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; @@ -28,6 +30,7 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import java.util.List; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -190,6 +193,92 @@ void mergeAndDiff() { } } + @Test + void diffInPlace() { + Attributes attributes = Attributes.builder().put("test", "value").build(); + DoubleExemplarData exemplar = + ImmutableDoubleExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1); + List exemplars = Collections.singletonList(exemplar); + List previousExemplars = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 1L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + + MutableDoublePointData previous = new MutableDoublePointData(); + MutableDoublePointData current = new MutableDoublePointData(); + + previous.set(0, 1, Attributes.empty(), 1, previousExemplars); + current.set(0, 1, Attributes.empty(), 3, exemplars); + + aggregator.diffInPlace(previous, current); + + /* Assert that latest measurement is kept and set on {@code previous} */ + Assertions.assertThat(previous.getStartEpochNanos()).isEqualTo(current.getStartEpochNanos()); + Assertions.assertThat(previous.getEpochNanos()).isEqualTo(current.getEpochNanos()); + assertThat(previous.getAttributes()).isEqualTo(current.getAttributes()); + Assertions.assertThat(previous.getValue()).isEqualTo(2); + Assertions.assertThat(previous.getExemplars()).isEqualTo(exemplars); + } + + @Test + void copyPoint() { + MutableDoublePointData pointData = (MutableDoublePointData) aggregator.createReusablePoint(); + + Attributes attributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsFrom = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1)); + pointData.set(0, 1, attributes, 2000, examplarsFrom); + + MutableDoublePointData toPointData = (MutableDoublePointData) aggregator.createReusablePoint(); + + Attributes toAttributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsTo = + Collections.singletonList( + ImmutableDoubleExemplarData.create( + attributes, + 4L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + toPointData.set(0, 2, toAttributes, 4000, examplarsTo); + + aggregator.copyPoint(pointData, toPointData); + + Assertions.assertThat(toPointData.getStartEpochNanos()) + .isEqualTo(pointData.getStartEpochNanos()); + Assertions.assertThat(toPointData.getEpochNanos()).isEqualTo(pointData.getEpochNanos()); + assertThat(toPointData.getAttributes()).isEqualTo(pointData.getAttributes()); + Assertions.assertThat(toPointData.getValue()).isEqualTo(pointData.getValue()); + Assertions.assertThat(toPointData.getExemplars()).isEqualTo(pointData.getExemplars()); + } + @Test void toMetricData() { AggregatorHandle aggregatorHandle = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java index 7eda2d345c4..e7d4140309f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongLastValueAggregatorTest.java @@ -7,19 +7,28 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import java.util.Collections; +import java.util.List; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; /** Unit tests for {@link LongLastValueAggregator}. */ @@ -74,6 +83,93 @@ void aggregateThenMaybeReset() { .isEqualTo(12L); } + @Test + void diffInPlace() { + Attributes attributes = Attributes.builder().put("test", "value").build(); + LongExemplarData exemplar = + ImmutableLongExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1); + List exemplars = Collections.singletonList(exemplar); + List previousExemplars = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 1L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + + MutableLongPointData previous = new MutableLongPointData(); + MutableLongPointData current = new MutableLongPointData(); + + previous.set(0, 1, Attributes.empty(), 1, previousExemplars); + current.set(0, 1, Attributes.empty(), 2, exemplars); + + aggregator.diffInPlace(previous, current); + + /* Assert that latest measurement is kept and set on {@code previous} */ + assertThat(previous.getStartEpochNanos()).isEqualTo(0); + assertThat(previous.getEpochNanos()).isEqualTo(1); + OpenTelemetryAssertions.assertThat(previous.getAttributes()).isEqualTo(Attributes.empty()); + assertThat(previous.getValue()).isEqualTo(2); + assertThat(previous.getExemplars()).isEqualTo(exemplars); + } + + @Test + void copyPoint() { + MutableLongPointData pointData = (MutableLongPointData) aggregator.createReusablePoint(); + + Attributes attributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsFrom = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1)); + pointData.set(0, 1, attributes, 2000, examplarsFrom); + + MutableLongPointData toPointData = (MutableLongPointData) aggregator.createReusablePoint(); + + Attributes toAttributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsTo = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 4L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + toPointData.set(0, 2, toAttributes, 4000, examplarsTo); + + aggregator.copyPoint(pointData, toPointData); + + Assertions.assertThat(toPointData.getStartEpochNanos()) + .isEqualTo(pointData.getStartEpochNanos()); + Assertions.assertThat(toPointData.getEpochNanos()).isEqualTo(pointData.getEpochNanos()); + OpenTelemetryAssertions.assertThat(toPointData.getAttributes()) + .isEqualTo(pointData.getAttributes()); + Assertions.assertThat(toPointData.getValue()).isEqualTo(pointData.getValue()); + Assertions.assertThat(toPointData.getExemplars()).isEqualTo(pointData.getExemplars()); + } + @Test void toMetricData() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java index d6f0f189834..81e6f40a8b7 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/LongSumAggregatorTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -21,13 +22,16 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import java.util.Collections; import java.util.List; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -184,6 +188,93 @@ void mergeAndDiff() { } } + @Test + void diffInPlace() { + Attributes attributes = Attributes.builder().put("test", "value").build(); + LongExemplarData exemplar = + ImmutableLongExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1); + List exemplars = Collections.singletonList(exemplar); + List previousExemplars = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 1L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + + MutableLongPointData previous = new MutableLongPointData(); + MutableLongPointData current = new MutableLongPointData(); + + previous.set(0, 1, Attributes.empty(), 1, previousExemplars); + current.set(0, 1, Attributes.empty(), 3, exemplars); + + aggregator.diffInPlace(previous, current); + + /* Assert that latest measurement is kept and set on {@code previous} */ + Assertions.assertThat(previous.getStartEpochNanos()).isEqualTo(current.getStartEpochNanos()); + Assertions.assertThat(previous.getEpochNanos()).isEqualTo(current.getEpochNanos()); + OpenTelemetryAssertions.assertThat(previous.getAttributes()).isEqualTo(current.getAttributes()); + Assertions.assertThat(previous.getValue()).isEqualTo(2); + Assertions.assertThat(previous.getExemplars()).isEqualTo(current.getExemplars()); + } + + @Test + void copyPoint() { + MutableLongPointData pointData = (MutableLongPointData) aggregator.createReusablePoint(); + + Attributes attributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsFrom = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 2L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 1)); + pointData.set(0, 1, attributes, 2000, examplarsFrom); + + MutableLongPointData toPointData = (MutableLongPointData) aggregator.createReusablePoint(); + + Attributes toAttributes = Attributes.of(AttributeKey.longKey("test"), 100L); + List examplarsTo = + Collections.singletonList( + ImmutableLongExemplarData.create( + attributes, + 4L, + SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getDefault(), + TraceState.getDefault()), + 2)); + toPointData.set(0, 2, toAttributes, 4000, examplarsTo); + + aggregator.copyPoint(pointData, toPointData); + + Assertions.assertThat(toPointData.getStartEpochNanos()) + .isEqualTo(pointData.getStartEpochNanos()); + Assertions.assertThat(toPointData.getEpochNanos()).isEqualTo(pointData.getEpochNanos()); + OpenTelemetryAssertions.assertThat(toPointData.getAttributes()) + .isEqualTo(pointData.getAttributes()); + Assertions.assertThat(toPointData.getValue()).isEqualTo(pointData.getValue()); + Assertions.assertThat(toPointData.getExemplars()).isEqualTo(pointData.getExemplars()); + } + @Test @SuppressWarnings("unchecked") void toMetricData() { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStackTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStackTest.java new file mode 100644 index 00000000000..e8cbda1764c --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ArrayBasedStackTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +class ArrayBasedStackTest { + + @Test + void testPushAndPop() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + stack.push(1); + stack.push(2); + assertThat(stack.pop()).isEqualTo(2); + assertThat(stack.pop()).isEqualTo(1); + } + + @Test + void testIsEmpty() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + assertThat(stack.isEmpty()).isTrue(); + stack.push(1); + assertThat(stack.isEmpty()).isFalse(); + } + + @Test + void testSize() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + assertThat(stack.size()).isEqualTo(0); + stack.push(1); + assertThat(stack.size()).isEqualTo(1); + } + + @Test + void testPushBeyondInitialCapacity() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + for (int i = 0; i < ArrayBasedStack.DEFAULT_CAPACITY + 5; i++) { + stack.push(i); + } + assertThat(stack.size()).isEqualTo(ArrayBasedStack.DEFAULT_CAPACITY + 5); + for (int i = ArrayBasedStack.DEFAULT_CAPACITY + 4; i >= 0; i--) { + assertThat(stack.pop()).isEqualTo(i); + } + } + + @Test + void testPopOnEmptyStack() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + assertThat(stack.pop()).isNull(); + } + + @Test + void testPushNullElement() { + ArrayBasedStack stack = new ArrayBasedStack<>(); + assertThatThrownBy(() -> stack.push(null)).isInstanceOf(NullPointerException.class); + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index fb1d147e4c5..109b8eef7a4 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -5,8 +5,9 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; import static org.mockito.ArgumentMatchers.any; @@ -16,12 +17,16 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.InstrumentSelector; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.data.MutableLongPointData; import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; @@ -31,10 +36,12 @@ import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.time.TestClock; -import org.junit.jupiter.api.BeforeEach; +import java.util.Collection; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -65,9 +72,10 @@ class AsynchronousMetricStorageTest { private AsynchronousMetricStorage longCounterStorage; private AsynchronousMetricStorage doubleCounterStorage; - @BeforeEach - void setup() { + // Not using @BeforeEach since many methods require executing them for each MemoryMode + void setup(MemoryMode memoryMode) { when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.CUMULATIVE); + when(reader.getMemoryMode()).thenReturn(memoryMode); registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); longCounterStorage = @@ -94,14 +102,14 @@ void setup() { Advice.empty())); } - @Test - void recordLong() { - longCounterStorage.record( - longMeasurement(0, 1, 1, Attributes.builder().put("key", "a").build())); - longCounterStorage.record( - longMeasurement(0, 1, 2, Attributes.builder().put("key", "b").build())); - longCounterStorage.record( - longMeasurement(0, 1, 3, Attributes.builder().put("key", "c").build())); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordLong(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); + longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); + longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -119,14 +127,17 @@ void recordLong() { assertThat(logs.size()).isEqualTo(0); } - @Test - void recordDouble() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void recordDouble(MemoryMode memoryMode) { + setup(memoryMode); + doubleCounterStorage.record( - doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "a").build())); + createDouble(0, 1, 1.1, Attributes.builder().put("key", "a").build())); doubleCounterStorage.record( - doubleMeasurement(0, 1, 2.2, Attributes.builder().put("key", "b").build())); + createDouble(0, 1, 2.2, Attributes.builder().put("key", "b").build())); doubleCounterStorage.record( - doubleMeasurement(0, 1, 3.3, Attributes.builder().put("key", "c").build())); + createDouble(0, 1, 3.3, Attributes.builder().put("key", "c").build())); assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -146,8 +157,11 @@ void recordDouble() { assertThat(logs.size()).isEqualTo(0); } - @Test - void record_ProcessesAttributes() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void record_ProcessesAttributes(MemoryMode memoryMode) { + setup(memoryMode); + AsynchronousMetricStorage storage = AsynchronousMetricStorage.create( registeredReader, @@ -166,7 +180,7 @@ void record_ProcessesAttributes() { Advice.empty())); storage.record( - longMeasurement(0, 1, 1, Attributes.builder().put("key1", "a").put("key2", "b").build())); + createLong(0, 1, 1, Attributes.builder().put("key1", "a").put("key2", "b").build())); assertThat(storage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -180,11 +194,14 @@ void record_ProcessesAttributes() { assertThat(logs.size()).isEqualTo(0); } - @Test - void record_MaxCardinality() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void record_MaxCardinality(MemoryMode memoryMode) { + setup(memoryMode); + for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) { longCounterStorage.record( - longMeasurement(0, 1, 1, Attributes.builder().put("key" + i, "val").build())); + createLong(0, 1, 1, Attributes.builder().put("key" + i, "val").build())); } assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) @@ -194,12 +211,13 @@ void record_MaxCardinality() { logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality"); } - @Test - void record_DuplicateAttributes() { - longCounterStorage.record( - longMeasurement(0, 1, 1, Attributes.builder().put("key1", "a").build())); - longCounterStorage.record( - longMeasurement(0, 1, 2, Attributes.builder().put("key1", "a").build())); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void record_DuplicateAttributes(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key1", "a").build())); + longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key1", "a").build())); assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( @@ -214,10 +232,13 @@ void record_DuplicateAttributes() { "Instrument long-counter has recorded multiple values for the same attributes: {key1=\"a\"}"); } - @Test - void collect_CumulativeReportsCumulativeObservations() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void collect_CumulativeReportsCumulativeObservations(MemoryMode memoryMode) { + setup(memoryMode); + // Record measurement and collect at time 10 - longCounterStorage.record(longMeasurement(0, 10, 3, Attributes.empty())); + longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -232,9 +253,9 @@ void collect_CumulativeReportsCumulativeObservations() { registeredReader.setLastCollectEpochNanos(10); // Record measurements and collect at time 30 - longCounterStorage.record(longMeasurement(0, 30, 3, Attributes.empty())); + longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); longCounterStorage.record( - longMeasurement(0, 30, 6, Attributes.builder().put("key", "value1").build())); + createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -255,9 +276,9 @@ void collect_CumulativeReportsCumulativeObservations() { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(longMeasurement(0, 35, 4, Attributes.empty())); + longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); longCounterStorage.record( - longMeasurement(0, 35, 5, Attributes.builder().put("key", "value2").build())); + createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -277,8 +298,11 @@ void collect_CumulativeReportsCumulativeObservations() { .hasAttributes(Attributes.builder().put("key", "value2").build()))); } - @Test - void collect_DeltaComputesDiff() { + @ParameterizedTest + @EnumSource(MemoryMode.class) + void collect_DeltaComputesDiff(MemoryMode memoryMode) { + setup(memoryMode); + when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.DELTA); longCounterStorage = AsynchronousMetricStorage.create( @@ -293,7 +317,7 @@ void collect_DeltaComputesDiff() { Advice.empty())); // Record measurement and collect at time 10 - longCounterStorage.record(longMeasurement(0, 10, 3, Attributes.empty())); + longCounterStorage.record(createLong(0, 10, 3, Attributes.empty())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -308,9 +332,9 @@ void collect_DeltaComputesDiff() { registeredReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 - longCounterStorage.record(longMeasurement(0, 30, 3, Attributes.empty())); + longCounterStorage.record(createLong(0, 30, 3, Attributes.empty())); longCounterStorage.record( - longMeasurement(0, 30, 6, Attributes.builder().put("key", "value1").build())); + createLong(0, 30, 6, Attributes.builder().put("key", "value1").build())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -331,9 +355,9 @@ void collect_DeltaComputesDiff() { registeredReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 - longCounterStorage.record(longMeasurement(0, 35, 4, Attributes.empty())); + longCounterStorage.record(createLong(0, 35, 4, Attributes.empty())); longCounterStorage.record( - longMeasurement(0, 35, 5, Attributes.builder().put("key", "value2").build())); + createLong(0, 35, 5, Attributes.builder().put("key", "value2").build())); assertThat(longCounterStorage.collect(resource, scope, 0, 0)) .hasLongSumSatisfying( sum -> @@ -352,4 +376,54 @@ void collect_DeltaComputesDiff() { .hasValue(5) .hasAttributes(Attributes.builder().put("key", "value2").build()))); } + + @Test + public void collect_reusableData_reusedObjectsAreReturnedOnSecondCall() { + setup(REUSABLE_DATA); + + longCounterStorage.record(createLong(0, 1, 1, Attributes.builder().put("key", "a").build())); + longCounterStorage.record(createLong(0, 1, 2, Attributes.builder().put("key", "b").build())); + longCounterStorage.record(createLong(0, 1, 3, Attributes.builder().put("key", "c").build())); + + MetricData firstCollectMetricData = + longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); + assertThat(firstCollectMetricData) + .satisfies( + metricData -> + assertThat(metricData) + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + point -> + point + .hasValue(1) + .hasAttributes(attributeEntry("key", "a")) + .isInstanceOf(MutableLongPointData.class), + point -> + point + .hasValue(2) + .hasAttributes(attributeEntry("key", "b")) + .isInstanceOf(MutableLongPointData.class), + point -> + point + .hasValue(3) + .hasAttributes(attributeEntry("key", "c")) + .isInstanceOf(MutableLongPointData.class)))); + + MetricData secondCollectMetricData = + longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()); + + Collection secondCollectPoints = + secondCollectMetricData.getData().getPoints(); + Collection firstCollectionPoints = + firstCollectMetricData.getData().getPoints(); + assertThat(secondCollectPoints).hasSameSizeAs(firstCollectionPoints); + + // Show that second returned objects have been used in first collect response as well + // which proves there is reuse. + for (PointData firstCollectionPoint : firstCollectionPoints) { + assertThat(secondCollectPoints) + .anySatisfy(point -> assertThat(point).isSameAs(firstCollectionPoint)); + } + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java index a15f8e4affd..a8b424dfcd5 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/CallbackRegistrationTest.java @@ -5,8 +5,8 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement; -import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createDouble; +import static io.opentelemetry.sdk.metrics.internal.state.ImmutableMeasurement.createLong; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -19,6 +19,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; import io.opentelemetry.sdk.metrics.export.MetricReader; @@ -76,6 +77,7 @@ class CallbackRegistrationTest { @BeforeEach void setup() { + when(reader.getMemoryMode()).thenReturn(MemoryMode.IMMUTABLE_DATA); registeredReader = RegisteredReader.create(reader, ViewRegistry.create()); when(storage1.getRegisteredReader()).thenReturn(registeredReader); when(storage2.getRegisteredReader()).thenReturn(registeredReader); @@ -145,7 +147,7 @@ void invokeCallback_Double() { assertThat(counter.get()).isEqualTo(1.1); verify(storage1) - .record(doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "val").build())); + .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); verify(storage2, never()).record(any()); verify(storage3, never()).record(any()); } @@ -164,10 +166,8 @@ void invokeCallback_Long() { assertThat(counter.get()).isEqualTo(1); verify(storage1, never()).record(any()); - verify(storage2) - .record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3) - .record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); } @Test @@ -189,11 +189,9 @@ void invokeCallback_MultipleMeasurements() { assertThat(doubleCounter.get()).isEqualTo(1.1); assertThat(longCounter.get()).isEqualTo(1); verify(storage1) - .record(doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "val").build())); - verify(storage2) - .record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build())); - verify(storage3) - .record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build())); + .record(createDouble(0, 1, 1.1, Attributes.builder().put("key", "val").build())); + verify(storage2).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); + verify(storage3).record(createLong(0, 1, 1, Attributes.builder().put("key", "val").build())); } @Test diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPoolTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPoolTest.java new file mode 100644 index 00000000000..ee90d303783 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/ObjectPoolTest.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ObjectPoolTest { + private ObjectPool objectPool; + + @BeforeEach + void setUp() { + Supplier supplier = StringBuilder::new; + objectPool = new ObjectPool<>(supplier); + } + + @Test + void testBorrowObjectWhenPoolIsEmpty() { + StringBuilder result = objectPool.borrowObject(); + assertThat(result.toString()).isEmpty(); + } + + @Test + void testReturnAndBorrowMultipleObjects() { + // Borrow three objects + StringBuilder borrowed1 = objectPool.borrowObject(); + StringBuilder borrowed2 = objectPool.borrowObject(); + StringBuilder borrowed3 = objectPool.borrowObject(); + + // Modify and return the borrowed objects + borrowed1.append("pooledObject1"); + objectPool.returnObject(borrowed1); + borrowed2.append("pooledObject2"); + objectPool.returnObject(borrowed2); + borrowed3.append("pooledObject3"); + objectPool.returnObject(borrowed3); + + // Borrow three objects, which should be the same ones we just returned + StringBuilder result1 = objectPool.borrowObject(); + StringBuilder result2 = objectPool.borrowObject(); + StringBuilder result3 = objectPool.borrowObject(); + + // Verify the results using AssertJ assertions and reference comparison + List originalObjects = Arrays.asList(borrowed1, borrowed2, borrowed3); + List borrowedObjects = Arrays.asList(result1, result2, result3); + + assertThat(originalObjects).hasSize(3); + assertThat(borrowedObjects).hasSize(3); + + for (StringBuilder original : originalObjects) { + assertThat(borrowedObjects).anySatisfy(borrowed -> assertThat(borrowed).isSameAs(original)); + } + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java new file mode 100644 index 00000000000..f53aec03f90 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import java.util.HashMap; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PooledHashMapTest { + + private PooledHashMap map; + + @BeforeEach + public void setup() { + map = new PooledHashMap<>(); + } + + @Test + public void putAndGetTest() { + map.put("One", 1); + Assertions.assertThat(map.get("One")).isEqualTo(1); + } + + @Test + public void removeTest() { + map.put("One", 1); + map.remove("One"); + Assertions.assertThat(map.get("One")).isNull(); + } + + @Test + public void sizeTest() { + map.put("One", 1); + map.put("Two", 2); + Assertions.assertThat(map.size()).isEqualTo(2); + } + + @Test + public void isEmptyTest() { + Assertions.assertThat(map.isEmpty()).isTrue(); + map.put("One", 1); + Assertions.assertThat(map.isEmpty()).isFalse(); + } + + @Test + public void containsKeyTest() { + map.put("One", 1); + Assertions.assertThat(map.containsKey("One")).isTrue(); + Assertions.assertThat(map.containsKey("Two")).isFalse(); + } + + @Test + public void clearTest() { + map.put("One", 1); + map.put("Two", 2); + map.clear(); + Assertions.assertThat(map.isEmpty()).isTrue(); + } + + @Test + public void forEachTest() { + map.put("One", 1); + map.put("Two", 2); + + Map actualMap = new HashMap<>(); + map.forEach(actualMap::put); + + Assertions.assertThat(actualMap).containsOnlyKeys("One", "Two").containsValues(1, 2); + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java new file mode 100644 index 00000000000..ca3590f1fd4 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SdkObservableMeasurementTest.java @@ -0,0 +1,175 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.state; + +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.InstrumentValueType; +import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; +import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; +import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings("rawtypes") +public class SdkObservableMeasurementTest { + + private AsynchronousMetricStorage mockAsyncStorage1; + private RegisteredReader registeredReader1; + private SdkObservableMeasurement sdkObservableMeasurement; + private ArgumentCaptor measurementArgumentCaptor; + + @SuppressWarnings("unchecked") + private void setup(MemoryMode memoryMode) { + InstrumentationScopeInfo instrumentationScopeInfo = + InstrumentationScopeInfo.builder("test-scope").build(); + InstrumentDescriptor instrumentDescriptor = + InstrumentDescriptor.create( + "testCounter", + "an instrument for testing purposes", + "ms", + InstrumentType.COUNTER, + InstrumentValueType.LONG, + Advice.empty()); + + InMemoryMetricReader reader1 = + InMemoryMetricReader.builder() + .setAggregationTemporalitySelector(instrumentType -> CUMULATIVE) + .setMemoryMode(memoryMode) + .build(); + registeredReader1 = RegisteredReader.create(reader1, ViewRegistry.create()); + + InMemoryMetricReader reader2 = InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(); + RegisteredReader registeredReader2 = RegisteredReader.create(reader2, ViewRegistry.create()); + + measurementArgumentCaptor = ArgumentCaptor.forClass(Measurement.class); + mockAsyncStorage1 = mock(AsynchronousMetricStorage.class); + when(mockAsyncStorage1.getRegisteredReader()).thenReturn(registeredReader1); + AsynchronousMetricStorage mockAsyncStorage2 = mock(AsynchronousMetricStorage.class); + when(mockAsyncStorage2.getRegisteredReader()).thenReturn(registeredReader2); + + sdkObservableMeasurement = + SdkObservableMeasurement.create( + instrumentationScopeInfo, + instrumentDescriptor, + Lists.newArrayList(mockAsyncStorage1, mockAsyncStorage2)); + } + + @Test + public void testRecordLongImmutableData() { + setup(MemoryMode.IMMUTABLE_DATA); + + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + + try { + sdkObservableMeasurement.record(5); + + verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); + Measurement passedMeasurement = measurementArgumentCaptor.getValue(); + assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); + assertThat(passedMeasurement.longValue()).isEqualTo(5); + assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + } finally { + sdkObservableMeasurement.unsetActiveReader(); + } + } + + @Test + public void testRecordDoubleReturnImmutableData() { + setup(MemoryMode.IMMUTABLE_DATA); + + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + + try { + sdkObservableMeasurement.record(4.3); + + verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); + Measurement passedMeasurement = measurementArgumentCaptor.getValue(); + assertThat(passedMeasurement).isInstanceOf(ImmutableMeasurement.class); + assertThat(passedMeasurement.doubleValue()).isEqualTo(4.3); + assertThat(passedMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(passedMeasurement.epochNanos()).isEqualTo(10); + } finally { + sdkObservableMeasurement.unsetActiveReader(); + } + } + + @Test + public void testRecordDoubleReturnReusableData() { + setup(MemoryMode.REUSABLE_DATA); + + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + + try { + sdkObservableMeasurement.record(4.3); + + verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); + Measurement firstMeasurement = measurementArgumentCaptor.getValue(); + assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); + assertThat(firstMeasurement.doubleValue()).isEqualTo(4.3); + assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + + sdkObservableMeasurement.record(5.3); + + verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); + Measurement secondMeasurement = measurementArgumentCaptor.getValue(); + assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); + assertThat(secondMeasurement.doubleValue()).isEqualTo(5.3); + assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(secondMeasurement.epochNanos()).isEqualTo(10); + + // LeasedMeasurement should be re-used + assertThat(secondMeasurement).isSameAs(firstMeasurement); + } finally { + sdkObservableMeasurement.unsetActiveReader(); + } + } + + @Test + public void testRecordLongReturnReusableData() { + setup(MemoryMode.REUSABLE_DATA); + + sdkObservableMeasurement.setActiveReader(registeredReader1, 0, 10); + + try { + sdkObservableMeasurement.record(2); + + verify(mockAsyncStorage1).record(measurementArgumentCaptor.capture()); + Measurement firstMeasurement = measurementArgumentCaptor.getValue(); + assertThat(firstMeasurement).isInstanceOf(MutableMeasurement.class); + assertThat(firstMeasurement.longValue()).isEqualTo(2); + assertThat(firstMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(firstMeasurement.epochNanos()).isEqualTo(10); + + sdkObservableMeasurement.record(6); + + verify(mockAsyncStorage1, times(2)).record(measurementArgumentCaptor.capture()); + Measurement secondMeasurement = measurementArgumentCaptor.getValue(); + assertThat(secondMeasurement).isInstanceOf(MutableMeasurement.class); + assertThat(secondMeasurement.longValue()).isEqualTo(6); + assertThat(secondMeasurement.startEpochNanos()).isEqualTo(0); + assertThat(secondMeasurement.epochNanos()).isEqualTo(10); + + // LeasedMeasurement should be re-used + assertThat(secondMeasurement).isSameAs(firstMeasurement); + } finally { + sdkObservableMeasurement.unsetActiveReader(); + } + } +} diff --git a/sdk/metrics/testing-internal/build.gradle.kts b/sdk/metrics/testing-internal/build.gradle.kts new file mode 100644 index 00000000000..df76bebe8fb --- /dev/null +++ b/sdk/metrics/testing-internal/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("otel.java-conventions") +} + +description = "OpenTelemetry Metrics Testing (Internal)" +otelJava.moduleName.set("io.opentelemetry.metrics.internal.testing") + +dependencies { + api("org.junit.jupiter:junit-jupiter-api") + implementation("org.slf4j:jul-to-slf4j") + api(project(":sdk:common")) + api(project(":sdk:metrics")) +} diff --git a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/assertj/LongSumAssert.java b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/assertj/LongSumAssert.java index 16ec2109e9a..128a95ebce8 100644 --- a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/assertj/LongSumAssert.java +++ b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/assertj/LongSumAssert.java @@ -73,14 +73,14 @@ public LongSumAssert isDelta() { return myself; } - /** Asserts the sum has points matching all of the given assertions and no more, in any order. */ + /** Asserts the sum has points matching all the given assertions and no more, in any order. */ @SafeVarargs @SuppressWarnings("varargs") public final LongSumAssert hasPointsSatisfying(Consumer... assertions) { return hasPointsSatisfying(Arrays.asList(assertions)); } - /** Asserts the sum has points matching all of the given assertions and no more, in any order. */ + /** Asserts the sum has points matching all the given assertions and no more, in any order. */ public LongSumAssert hasPointsSatisfying( Iterable> assertions) { assertThat(actual.getPoints()) diff --git a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReader.java b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReader.java index 7957035b2ed..d9aa8bfb3c5 100644 --- a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReader.java +++ b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReader.java @@ -5,7 +5,10 @@ package io.opentelemetry.sdk.testing.exporter; +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; + import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -52,6 +55,18 @@ public class InMemoryMetricReader implements MetricReader { private final DefaultAggregationSelector defaultAggregationSelector; private final AtomicBoolean isShutdown = new AtomicBoolean(false); private volatile MetricProducer metricProducer = MetricProducer.noop(); + private final MemoryMode memoryMode; + + /** + * Creates an {@link InMemoryMetricReaderBuilder} with defaults. + * + * @return a builder with always-cumulative {@link AggregationTemporalitySelector}, default {@link + * DefaultAggregationSelector} and {@link MemoryMode#IMMUTABLE_DATA} {@link MemoryMode} + * @since 1.29.0 + */ + public static InMemoryMetricReaderBuilder builder() { + return new InMemoryMetricReaderBuilder(); + } /** Returns a new {@link InMemoryMetricReader}. */ public static InMemoryMetricReader create() { @@ -79,8 +94,16 @@ public static InMemoryMetricReader createDelta() { private InMemoryMetricReader( AggregationTemporalitySelector aggregationTemporalitySelector, DefaultAggregationSelector defaultAggregationSelector) { + this(aggregationTemporalitySelector, defaultAggregationSelector, IMMUTABLE_DATA); + } + + InMemoryMetricReader( + AggregationTemporalitySelector aggregationTemporalitySelector, + DefaultAggregationSelector defaultAggregationSelector, + MemoryMode memoryMode) { this.aggregationTemporalitySelector = aggregationTemporalitySelector; this.defaultAggregationSelector = defaultAggregationSelector; + this.memoryMode = memoryMode; } /** Returns all metrics accumulated since the last call. */ @@ -118,6 +141,11 @@ public CompletableResultCode shutdown() { return CompletableResultCode.ofSuccess(); } + @Override + public MemoryMode getMemoryMode() { + return memoryMode; + } + @Override public String toString() { return "InMemoryMetricReader{}"; diff --git a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReaderBuilder.java b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReaderBuilder.java new file mode 100644 index 00000000000..e3824e6cae7 --- /dev/null +++ b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/exporter/InMemoryMetricReaderBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.testing.exporter; + +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; + +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; + +public final class InMemoryMetricReaderBuilder { + private AggregationTemporalitySelector aggregationTemporalitySelector = + AggregationTemporalitySelector.alwaysCumulative(); + private DefaultAggregationSelector defaultAggregationSelector = + DefaultAggregationSelector.getDefault(); + private MemoryMode memoryMode = IMMUTABLE_DATA; + + /** + * Creates an {@link InMemoryMetricReaderBuilder} with defaults. + * + *

Creates a builder with always-cumulative {@link AggregationTemporalitySelector}, default + * {@link DefaultAggregationSelector} and {@link MemoryMode#IMMUTABLE_DATA} {@link MemoryMode} + */ + InMemoryMetricReaderBuilder() {} + + public InMemoryMetricReaderBuilder setAggregationTemporalitySelector( + AggregationTemporalitySelector aggregationTemporalitySelector) { + this.aggregationTemporalitySelector = aggregationTemporalitySelector; + return this; + } + + /** + * Sets the {@link DefaultAggregationSelector}. + * + * @param defaultAggregationSelector the {@link DefaultAggregationSelector} to set + * @return this {@link InMemoryMetricReaderBuilder} + */ + @SuppressWarnings("unused") + public InMemoryMetricReaderBuilder setDefaultAggregationSelector( + DefaultAggregationSelector defaultAggregationSelector) { + this.defaultAggregationSelector = defaultAggregationSelector; + return this; + } + + /** + * Sets the {@link MemoryMode}. + * + * @param memoryMode the {@link MemoryMode} to set + * @return this {@link InMemoryMetricReaderBuilder} + */ + public InMemoryMetricReaderBuilder setMemoryMode(MemoryMode memoryMode) { + this.memoryMode = memoryMode; + return this; + } + + public InMemoryMetricReader build() { + return new InMemoryMetricReader( + aggregationTemporalitySelector, defaultAggregationSelector, memoryMode); + } +}