Skip to content

Commit

Permalink
Inference autoscaling telemetry (#110630)
Browse files Browse the repository at this point in the history
* Wire MeterRegistry

* Allow for collections of values in async APM measurements

* Adaptive allocations scaler metrics

* Update docs/changelog/110630.yaml

* Update 110630.yaml
  • Loading branch information
jan-elastic authored Jul 29, 2024
1 parent f1671c9 commit 26623f1
Show file tree
Hide file tree
Showing 15 changed files with 381 additions and 43 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/110630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 110630
summary: Telemetry for inference adaptive allocations
area: Machine Learning
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -96,11 +97,11 @@ public DoubleCounter getDoubleCounter(String name) {
}

@Override
public DoubleAsyncCounter registerDoubleAsyncCounter(
public DoubleAsyncCounter registerDoublesAsyncCounter(
String name,
String description,
String unit,
Supplier<DoubleWithAttributes> observer
Supplier<Collection<DoubleWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(doubleAsynchronousCounters, new DoubleAsyncCounterAdapter(meter, name, description, unit, observer));
Expand All @@ -125,7 +126,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) {
}

@Override
public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleGauge registerDoublesGauge(
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(doubleGauges, new DoubleGaugeAdapter(meter, name, description, unit, observer));
}
Expand Down Expand Up @@ -156,7 +162,12 @@ public LongCounter registerLongCounter(String name, String description, String u
}

@Override
public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongAsyncCounter registerLongsAsyncCounter(
String name,
String description,
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(longAsynchronousCounters, new LongAsyncCounterAdapter(meter, name, description, unit, observer));
}
Expand Down Expand Up @@ -185,7 +196,7 @@ public LongUpDownCounter getLongUpDownCounter(String name) {
}

@Override
public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongGauge registerLongsGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(longGauges, new LongGaugeAdapter(meter, name, description, unit, observer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
import org.elasticsearch.telemetry.metric.DoubleAsyncCounter;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

public class DoubleAsyncCounterAdapter extends AbstractInstrument<ObservableDoubleCounter> implements DoubleAsyncCounter {

public DoubleAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleAsyncCounterAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -30,9 +37,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableDoubleCounter> {
private final Supplier<DoubleWithAttributes> observer;
private final Supplier<Collection<DoubleWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<DoubleWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.telemetry.apm.AbstractInstrument;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

Expand All @@ -24,7 +25,13 @@ public class DoubleGaugeAdapter extends AbstractInstrument<ObservableDoubleGauge
implements
org.elasticsearch.telemetry.metric.DoubleGauge {

public DoubleGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleGaugeAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -34,9 +41,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableDoubleGauge> {
private final Supplier<DoubleWithAttributes> observer;
private final Supplier<Collection<DoubleWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<DoubleWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

public class LongAsyncCounterAdapter extends AbstractInstrument<ObservableLongCounter> implements LongAsyncCounter {

public LongAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongAsyncCounterAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -30,9 +37,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableLongCounter> {
private final Supplier<LongWithAttributes> observer;
private final Supplier<Collection<LongWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import org.elasticsearch.telemetry.apm.AbstractInstrument;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

/**
* LongGaugeAdapter wraps an otel ObservableLongGauge
*/
public class LongGaugeAdapter extends AbstractInstrument<ObservableLongGauge> implements org.elasticsearch.telemetry.metric.LongGauge {
public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -31,11 +32,11 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableLongGauge> {
private final Supplier<LongWithAttributes> observer;
private final Supplier<Collection<LongWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
this.observer = observer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,37 +54,45 @@ static Attributes fromMap(Map<String, Object> attributes) {
return builder.build();
}

static Consumer<ObservableDoubleMeasurement> doubleMeasurementCallback(Supplier<DoubleWithAttributes> observer) {
static Consumer<ObservableDoubleMeasurement> doubleMeasurementCallback(Supplier<Collection<DoubleWithAttributes>> observer) {
return measurement -> {
DoubleWithAttributes observation;
Collection<DoubleWithAttributes> observations;
try {
observation = observer.get();
observations = observer.get();
} catch (RuntimeException err) {
assert false : "observer must not throw [" + err.getMessage() + "]";
logger.error("doubleMeasurementCallback observer unexpected error", err);
return;
}
if (observation == null) {
if (observations == null) {
return;
}
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
for (DoubleWithAttributes observation : observations) {
if (observation != null) {
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
}
}
};
}

static Consumer<ObservableLongMeasurement> longMeasurementCallback(Supplier<LongWithAttributes> observer) {
static Consumer<ObservableLongMeasurement> longMeasurementCallback(Supplier<Collection<LongWithAttributes>> observer) {
return measurement -> {
LongWithAttributes observation;
Collection<LongWithAttributes> observations;
try {
observation = observer.get();
observations = observer.get();
} catch (RuntimeException err) {
assert false : "observer must not throw [" + err.getMessage() + "]";
logger.error("longMeasurementCallback observer unexpected error", err);
return;
}
if (observation == null) {
if (observations == null) {
return;
}
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
for (LongWithAttributes observation : observations) {
if (observation != null) {
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
}
}
};
}
}
Loading

0 comments on commit 26623f1

Please sign in to comment.