Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics v2 first steel thread with a LongCounter #2121

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions all/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def subprojects = [
project(':opentelemetry-opentracing-shim'),
project(':opentelemetry-sdk-common'),
project(':opentelemetry-sdk-metrics'),
project(':opentelemetry-sdk-metrics-v2'),
project(':opentelemetry-sdk-testing'),
project(':opentelemetry-sdk-trace'),
project(':opentelemetry-sdk'),
Expand Down
50 changes: 50 additions & 0 deletions sdk/metrics-v2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
plugins {
id "java"
id "maven-publish"

id "me.champeau.gradle.jmh"
id "ru.vyarus.animalsniffer"
}

description = 'OpenTelemetry SDK Metrics V2'
ext.moduleName = "io.opentelemetry.sdk.metrics-v2"
ext.propertiesDir = "build/generated/properties/io/opentelemetry/sdk/metrics-v2"

dependencies {
api project(':opentelemetry-api'),
project(':opentelemetry-sdk-common')

annotationProcessor libraries.auto_value

testAnnotationProcessor libraries.auto_value
testCompileOnly libraries.auto_value_annotation

testCompile project(path: ':opentelemetry-sdk-common', configuration: 'testClasses')

// testImplementation project(':opentelemetry-sdk-testing')
testImplementation libraries.junit_pioneer

signature libraries.android_signature
}

sourceSets {
main {
output.dir("build/generated/properties", builtBy: 'generateVersionResource')
}
}

animalsniffer {
// Don't check sourceSets.jmh and sourceSets.test
sourceSets = [
sourceSets.main
]
}

task generateVersionResource {
doLast {
def folder = file(propertiesDir)
folder.mkdirs()
def propertiesFile = new File(folder.getAbsolutePath(), "version.properties")
propertiesFile.write("sdk.version=${project.version}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.metricsv2.data.MetricData;

interface Accumulation {
long getStartTime();

MetricData.Point convertToPoint(long epochNanos, Labels labels);

MetricData.Type getMetricDataType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.common.Clock;
import java.util.Collection;
import java.util.Map;

@SuppressWarnings("rawtypes")
class Accumulator {
private final Clock clock;
private final AggregatorLookup lookup = new AggregatorLookup();

Accumulator(Clock clock) {
this.clock = clock;
}

void recordLongAdd(InstrumentKey instrumentKey, Labels labels, long increment) {
LongAggregator<?> longAggregator =
lookup.getOrCreate(instrumentKey, labels, () -> lookupLongAggregator(instrumentKey));

longAggregator.record(increment);
}

@SuppressWarnings("unused")
private LongAggregator lookupLongAggregator(InstrumentKey instrumentKey) {
// todo: look up from a ViewRegistry-like thingee. Note: this lookup needs to be identical
// to the one done in the Processor. The difference is that in here, all the aggregators *must*
// be configured as delta-aggregators (or we need to remove them from the map and re-create
// them at every collection cycle...or maybe manually reset them from the code in here?),
// and the ones in the processor are view-like and configurable, depending on your exporter
// needs.
return new LongSumAggregator(/* startTime=*/ clock.now(), /* keepCumulativeSums=*/ false);
}

// called by the Controller, either on an interval, or when a pull-based exporter needs it.
void collectAndSendTo(Processor processor) {
// this method will take all the accumulations and pass them on to the Processor
// it will do this for each of the Aggregators that have been created this collection cycle,
// doing a snapshot ("synchronized move") on each of them to get the appropriate Accumulation
// data. note: I assume that the snapshotting will be something that is done internal to the
// aggregator implementations, but I could see it going either way.
// todo: finish implementing me

// things to do:
// - if an aggregator is unused, in a cycle, deal with that case. does the aggregator signal
// that
// to this with a method call? like aggregator.hasRecordings() ?
// - when to remove the keys from the map? maybe if it has no recording in the interval?
// - maybe, since the Accumulator can't be long-term stateful, we can just remove them all,
// always?
// - double typed aggregators

Collection<InstrumentKey> instrumentKeys = lookup.getActiveInstrumentKeys();
for (InstrumentKey instrumentKey : instrumentKeys) {
Map<Labels, LongAggregator<?>> aggregators =
lookup.getAggregatorsForInstrument(instrumentKey, /* clean=*/ true);
// todo: change the processor to also talk InstrumentKeys.
for (Map.Entry<Labels, LongAggregator<?>> entry : aggregators.entrySet()) {
processor.process(
instrumentKey,
AggregatorKey.create(
instrumentKey.libraryInfo(), instrumentKey.instrumentDescriptor(), entry.getKey()),
entry.getValue().collect(clock));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import javax.annotation.concurrent.Immutable;

@AutoValue
@Immutable
abstract class AggregatorKey {
static AggregatorKey create(
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
Labels labels) {
return new AutoValue_AggregatorKey(instrumentationLibraryInfo, instrumentDescriptor, labels);
}

abstract InstrumentationLibraryInfo getInstrumentationLibraryInfo();

abstract InstrumentDescriptor getInstrumentDescriptor();

abstract Labels getLabels();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import io.opentelemetry.api.common.Labels;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// todo: there is a race condition here that needs to be fixed. `getOrCreate` will be being called
// for every recording. However, when a collection happens, we need to coordinate the cleaning of
// the map (if it's requested in the `getAggregatorsForInstrument` call). Otherwise, recordings
// made on the resulting aggregator could be missed by the accumulator.
class AggregatorLookup {
private final Map<InstrumentKey, Map<Labels, LongAggregator<?>>> data = new ConcurrentHashMap<>();

LongAggregator<?> getOrCreate(
InstrumentKey instrumentKey, Labels labels, AggregatorMaker creator) {
Map<Labels, LongAggregator<?>> registeredLabels =
data.computeIfAbsent(instrumentKey, k -> new ConcurrentHashMap<>());
return registeredLabels.computeIfAbsent(labels, l -> creator.make());
}

Collection<InstrumentKey> getActiveInstrumentKeys() {
return data.keySet();
}

Map<Labels, LongAggregator<?>> getAggregatorsForInstrument(
InstrumentKey instrumentKey, boolean clean) {
if (clean) {
return data.put(instrumentKey, new ConcurrentHashMap<>());
}
return data.get(instrumentKey);
}

interface AggregatorMaker {
LongAggregator<?> make();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.api.metrics.LongCounter.BoundLongCounter;

class BoundLongCounterImpl implements BoundLongCounter {

private final Accumulator accumulator;
private final InstrumentKey instrumentKey;
private final Labels labels;

BoundLongCounterImpl(Accumulator accumulator, InstrumentKey instrumentKey, Labels labels) {
this.accumulator = accumulator;
this.instrumentKey = instrumentKey;
this.labels = labels;
}

@Override
public void add(long increment) {
accumulator.recordLongAdd(instrumentKey, labels, increment);
}

@Override
public void unbind() {
// todo worry about this later
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metricsv2.data.MetricData;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

// todo: we need some sort of pull-based controller, as well.
class Controller {
private final ScheduledExecutorService scheduler;
private final Accumulator accumulator;
private final Processor processor;
private final MetricExporter exporter;
private ScheduledFuture<?> scheduledFuture;

Controller(Accumulator accumulator, Processor processor, MetricExporter exporter) {
this.accumulator = accumulator;
this.processor = processor;
this.exporter = exporter;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}

void start() {
if (scheduledFuture != null) {
return;
}
scheduledFuture = scheduler.scheduleAtFixedRate(this::runOneCycle, 10, 10, TimeUnit.SECONDS);
}

CompletableResultCode shutdown() {
// todo implement me for real, flush exporter, etc.
scheduledFuture.cancel(false);
scheduler.shutdown();
return new CompletableResultCode().succeed();
}

// visible for testing only!
void runOneCycle() {
processor.start();
accumulator.collectAndSendTo(processor);
Collection<MetricData> dataToExport = processor.finish();
exporter.export(dataToExport);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import javax.annotation.concurrent.Immutable;

@AutoValue
@Immutable
abstract class InstrumentDescriptor {
static InstrumentDescriptor create(
String name,
String description,
String unit,
InstrumentType type,
InstrumentValueType valueType) {
return new AutoValue_InstrumentDescriptor(name, description, unit, type, valueType);
}

abstract String getName();

abstract String getDescription();

abstract String getUnit();

abstract InstrumentType getType();

abstract InstrumentValueType getValueType();

@Memoized
@Override
public abstract int hashCode();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;

@AutoValue
abstract class InstrumentKey {
static InstrumentKey create(
InstrumentDescriptor descriptor, InstrumentationLibraryInfo libraryInfo) {
return new AutoValue_InstrumentKey(descriptor, libraryInfo);
}

abstract InstrumentDescriptor instrumentDescriptor();

abstract InstrumentationLibraryInfo libraryInfo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

/** All instrument types available in the metric package. */
public enum InstrumentType {
COUNTER,
UP_DOWN_COUNTER,
VALUE_RECORDER,
SUM_OBSERVER,
UP_DOWN_SUM_OBSERVER,
VALUE_OBSERVER,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metricsv2;

/** All possible types for the values recorded via the instruments. */
public enum InstrumentValueType {
LONG,
DOUBLE,
}
Loading