Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inference autoscaling telemetry #110630

Merged
merged 6 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/110630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 110630
summary: Telemetry for inference adaptive allocations
area: Machine Learning
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -96,11 +97,11 @@ public DoubleCounter getDoubleCounter(String name) {
}

@Override
public DoubleAsyncCounter registerDoubleAsyncCounter(
public DoubleAsyncCounter registerDoublesAsyncCounter(
String name,
String description,
String unit,
Supplier<DoubleWithAttributes> observer
Supplier<Collection<DoubleWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(doubleAsynchronousCounters, new DoubleAsyncCounterAdapter(meter, name, description, unit, observer));
Expand All @@ -125,7 +126,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) {
}

@Override
public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleGauge registerDoublesGauge(
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(doubleGauges, new DoubleGaugeAdapter(meter, name, description, unit, observer));
}
Expand Down Expand Up @@ -156,7 +162,12 @@ public LongCounter registerLongCounter(String name, String description, String u
}

@Override
public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongAsyncCounter registerLongsAsyncCounter(
String name,
String description,
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(longAsynchronousCounters, new LongAsyncCounterAdapter(meter, name, description, unit, observer));
}
Expand Down Expand Up @@ -185,7 +196,7 @@ public LongUpDownCounter getLongUpDownCounter(String name) {
}

@Override
public LongGauge registerLongGauge(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongGauge registerLongsGauge(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
try (ReleasableLock lock = registerLock.acquire()) {
return register(longGauges, new LongGaugeAdapter(meter, name, description, unit, observer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
import org.elasticsearch.telemetry.metric.DoubleAsyncCounter;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

public class DoubleAsyncCounterAdapter extends AbstractInstrument<ObservableDoubleCounter> implements DoubleAsyncCounter {

public DoubleAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleAsyncCounterAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -30,9 +37,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableDoubleCounter> {
private final Supplier<DoubleWithAttributes> observer;
private final Supplier<Collection<DoubleWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<DoubleWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.telemetry.apm.AbstractInstrument;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

Expand All @@ -24,7 +25,13 @@ public class DoubleGaugeAdapter extends AbstractInstrument<ObservableDoubleGauge
implements
org.elasticsearch.telemetry.metric.DoubleGauge {

public DoubleGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
public DoubleGaugeAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<DoubleWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -34,9 +41,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableDoubleGauge> {
private final Supplier<DoubleWithAttributes> observer;
private final Supplier<Collection<DoubleWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<DoubleWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<DoubleWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

public class LongAsyncCounterAdapter extends AbstractInstrument<ObservableLongCounter> implements LongAsyncCounter {

public LongAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongAsyncCounterAdapter(
Meter meter,
String name,
String description,
String unit,
Supplier<Collection<LongWithAttributes>> observer
) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -30,9 +37,9 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableLongCounter> {
private final Supplier<LongWithAttributes> observer;
private final Supplier<Collection<LongWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import org.elasticsearch.telemetry.apm.AbstractInstrument;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;

/**
* LongGaugeAdapter wraps an otel ObservableLongGauge
*/
public class LongGaugeAdapter extends AbstractInstrument<ObservableLongGauge> implements org.elasticsearch.telemetry.metric.LongGauge {
public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<LongWithAttributes> observer) {
public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(meter, new Builder(name, description, unit, observer));
}

Expand All @@ -31,11 +32,11 @@ public void close() throws Exception {
}

private static class Builder extends AbstractInstrument.Builder<ObservableLongGauge> {
private final Supplier<LongWithAttributes> observer;
private final Supplier<Collection<LongWithAttributes>> observer;

private Builder(String name, String description, String unit, Supplier<LongWithAttributes> observer) {
private Builder(String name, String description, String unit, Supplier<Collection<LongWithAttributes>> observer) {
super(name, description, unit);
this.observer = Objects.requireNonNull(observer);
this.observer = observer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.LongWithAttributes;

import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,37 +54,45 @@ static Attributes fromMap(Map<String, Object> attributes) {
return builder.build();
}

static Consumer<ObservableDoubleMeasurement> doubleMeasurementCallback(Supplier<DoubleWithAttributes> observer) {
static Consumer<ObservableDoubleMeasurement> doubleMeasurementCallback(Supplier<Collection<DoubleWithAttributes>> observer) {
return measurement -> {
DoubleWithAttributes observation;
Collection<DoubleWithAttributes> observations;
try {
observation = observer.get();
observations = observer.get();
} catch (RuntimeException err) {
assert false : "observer must not throw [" + err.getMessage() + "]";
logger.error("doubleMeasurementCallback observer unexpected error", err);
return;
}
if (observation == null) {
if (observations == null) {
return;
}
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
for (DoubleWithAttributes observation : observations) {
if (observation != null) {
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
}
}
};
}

static Consumer<ObservableLongMeasurement> longMeasurementCallback(Supplier<LongWithAttributes> observer) {
static Consumer<ObservableLongMeasurement> longMeasurementCallback(Supplier<Collection<LongWithAttributes>> observer) {
return measurement -> {
LongWithAttributes observation;
Collection<LongWithAttributes> observations;
try {
observation = observer.get();
observations = observer.get();
} catch (RuntimeException err) {
assert false : "observer must not throw [" + err.getMessage() + "]";
logger.error("longMeasurementCallback observer unexpected error", err);
return;
}
if (observation == null) {
if (observations == null) {
return;
}
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
for (LongWithAttributes observation : observations) {
if (observation != null) {
measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes()));
}
}
};
}
}
Loading
Loading