diff --git a/docs/changelog/110630.yaml b/docs/changelog/110630.yaml new file mode 100644 index 0000000000000..9bf78e1209753 --- /dev/null +++ b/docs/changelog/110630.yaml @@ -0,0 +1,5 @@ +pr: 110630 +summary: Telemetry for inference adaptive allocations +area: Machine Learning +type: feature +issues: [] diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java index 382fc9417eac0..831e2f19e0126 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java @@ -38,6 +38,7 @@ import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -96,11 +97,11 @@ public DoubleCounter getDoubleCounter(String name) { } @Override - public DoubleAsyncCounter registerDoubleAsyncCounter( + public DoubleAsyncCounter registerDoublesAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { try (ReleasableLock lock = registerLock.acquire()) { return register(doubleAsynchronousCounters, new DoubleAsyncCounterAdapter(meter, name, description, unit, observer)); @@ -125,7 +126,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) { } @Override - public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { try (ReleasableLock lock = registerLock.acquire()) { return register(doubleGauges, new DoubleGaugeAdapter(meter, name, description, unit, observer)); } @@ -156,7 +162,12 @@ public LongCounter registerLongCounter(String name, String description, String u } @Override - public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer) { + public LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { try (ReleasableLock lock = registerLock.acquire()) { return register(longAsynchronousCounters, new LongAsyncCounterAdapter(meter, name, description, unit, observer)); } @@ -185,7 +196,7 @@ public LongUpDownCounter getLongUpDownCounter(String name) { } @Override - public LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + public LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer) { try (ReleasableLock lock = registerLock.acquire()) { return register(longGauges, new LongGaugeAdapter(meter, name, description, unit, observer)); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java index 6b17a83619ef7..ab735c41ca890 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java @@ -15,12 +15,19 @@ import org.elasticsearch.telemetry.metric.DoubleAsyncCounter; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; public class DoubleAsyncCounterAdapter extends AbstractInstrument implements DoubleAsyncCounter { - public DoubleAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public DoubleAsyncCounterAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -30,9 +37,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java index ed6ecee66d696..2a9c2d45981ed 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java @@ -14,6 +14,7 @@ import org.elasticsearch.telemetry.apm.AbstractInstrument; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; @@ -24,7 +25,13 @@ public class DoubleGaugeAdapter extends AbstractInstrument observer) { + public DoubleGaugeAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -34,9 +41,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java index 14c58139d03e1..1bc21ef2c831c 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java @@ -15,12 +15,19 @@ import org.elasticsearch.telemetry.metric.LongAsyncCounter; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; public class LongAsyncCounterAdapter extends AbstractInstrument implements LongAsyncCounter { - public LongAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public LongAsyncCounterAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -30,9 +37,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java index 52c19c80c284f..eab9ed2eb5278 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java @@ -14,6 +14,7 @@ import org.elasticsearch.telemetry.apm.AbstractInstrument; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; @@ -21,7 +22,7 @@ * LongGaugeAdapter wraps an otel ObservableLongGauge */ public class LongGaugeAdapter extends AbstractInstrument implements org.elasticsearch.telemetry.metric.LongGauge { - public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier> observer) { super(meter, new Builder(name, description, unit, observer)); } @@ -31,11 +32,11 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); - this.observer = Objects.requireNonNull(observer); + this.observer = observer; } @Override diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java index 3e8ab415bd25e..1d760c8c12791 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java @@ -17,6 +17,7 @@ import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; @@ -53,37 +54,45 @@ static Attributes fromMap(Map attributes) { return builder.build(); } - static Consumer doubleMeasurementCallback(Supplier observer) { + static Consumer doubleMeasurementCallback(Supplier> observer) { return measurement -> { - DoubleWithAttributes observation; + Collection observations; try { - observation = observer.get(); + observations = observer.get(); } catch (RuntimeException err) { assert false : "observer must not throw [" + err.getMessage() + "]"; logger.error("doubleMeasurementCallback observer unexpected error", err); return; } - if (observation == null) { + if (observations == null) { return; } - measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + for (DoubleWithAttributes observation : observations) { + if (observation != null) { + measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + } + } }; } - static Consumer longMeasurementCallback(Supplier observer) { + static Consumer longMeasurementCallback(Supplier> observer) { return measurement -> { - LongWithAttributes observation; + Collection observations; try { - observation = observer.get(); + observations = observer.get(); } catch (RuntimeException err) { assert false : "observer must not throw [" + err.getMessage() + "]"; logger.error("longMeasurementCallback observer unexpected error", err); return; } - if (observation == null) { + if (observations == null) { return; } - measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + for (LongWithAttributes observation : observations) { + if (observation != null) { + measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + } + } }; } } diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java b/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java index 0f690558361e4..12c62859fd372 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java @@ -8,6 +8,8 @@ package org.elasticsearch.telemetry.metric; +import java.util.Collection; +import java.util.Collections; import java.util.function.Supplier; /** @@ -15,6 +17,7 @@ * only be registered once. * TODO(stu): describe name, unit and description */ + public interface MeterRegistry { /** * Register a {@link DoubleCounter}. The returned object may be reused. @@ -57,7 +60,20 @@ public interface MeterRegistry { * Must not throw an exception and must be safe to call from different threads. * @return the registered meter. */ - DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer); + default DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + return registerDoublesGauge(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link DoubleGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer callback to use. This is called once during reporting period. + * Must not throw an exception and must be safe to call from different threads. + * @return the registered meter. + */ + DoubleGauge registerDoublesGauge(String name, String description, String unit, Supplier> observer); /** * Retrieved a previously registered {@link DoubleGauge}. @@ -98,7 +114,23 @@ public interface MeterRegistry { * @param unit the unit (bytes, sec, hour) * @param observer a callback to provide a metric value upon observation (metric interval) */ - LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer); + default LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer) { + return registerLongsAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link LongAsyncCounter} with an asynchronous callback. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer a callback to provide a metric values upon observation (metric interval) + */ + LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ); /** * Retrieved a previously registered {@link LongAsyncCounter}. @@ -114,7 +146,28 @@ public interface MeterRegistry { * @param unit the unit (bytes, sec, hour) * @param observer a callback to provide a metric value upon observation (metric interval) */ - DoubleAsyncCounter registerDoubleAsyncCounter(String name, String description, String unit, Supplier observer); + default DoubleAsyncCounter registerDoubleAsyncCounter( + String name, + String description, + String unit, + Supplier observer + ) { + return registerDoublesAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link DoubleAsyncCounter} with an asynchronous callback. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer a callback to provide a metric values upon observation (metric interval) + */ + DoubleAsyncCounter registerDoublesAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ); /** * Retrieved a previously registered {@link DoubleAsyncCounter}. @@ -155,7 +208,20 @@ public interface MeterRegistry { * Must not throw an exception and must be safe to call from different threads. * @return the registered meter. */ - LongGauge registerLongGauge(String name, String description, String unit, Supplier observer); + default LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + return registerLongsGauge(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link LongGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer callback to use. This is called once during reporting period. + * Must not throw an exception and must be safe to call from different threads. + * @return the registered meter. + */ + LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer); /** * Retrieved a previously registered {@link LongGauge}. @@ -204,7 +270,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) { } @Override - public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { return DoubleGauge.NOOP; } @@ -229,11 +300,11 @@ public LongCounter registerLongCounter(String name, String description, String u } @Override - public LongAsyncCounter registerLongAsyncCounter( + public LongAsyncCounter registerLongsAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { return LongAsyncCounter.NOOP; } @@ -244,11 +315,11 @@ public LongAsyncCounter getLongAsyncCounter(String name) { } @Override - public DoubleAsyncCounter registerDoubleAsyncCounter( + public DoubleAsyncCounter registerDoublesAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { return DoubleAsyncCounter.NOOP; } @@ -274,7 +345,12 @@ public LongUpDownCounter getLongUpDownCounter(String name) { } @Override - public LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + public LongGauge registerLongsGauge( + String name, + String description, + String unit, + Supplier> observer + ) { return LongGauge.NOOP; } diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 33693c297f166..97fe0ad1370ef 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -23,6 +23,7 @@ import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.Collection; import java.util.function.Supplier; /** @@ -76,6 +77,16 @@ public DoubleGauge registerDoubleGauge(String name, String description, String u return instrument; } + @Override + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public DoubleGauge getDoubleGauge(String name) { return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name); @@ -115,6 +126,16 @@ public LongAsyncCounter registerLongAsyncCounter(String name, String description return instrument; } + @Override + public LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public LongAsyncCounter getLongAsyncCounter(String name) { return (LongAsyncCounter) recorder.getInstrument(InstrumentType.LONG_ASYNC_COUNTER, name); @@ -132,6 +153,16 @@ public DoubleAsyncCounter registerDoubleAsyncCounter( return instrument; } + @Override + public DoubleAsyncCounter registerDoublesAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public DoubleAsyncCounter getDoubleAsyncCounter(String name) { return (DoubleAsyncCounter) recorder.getInstrument(InstrumentType.DOUBLE_ASYNC_COUNTER, name); @@ -170,6 +201,11 @@ public LongGauge registerLongGauge(String name, String description, String unit, return instrument; } + @Override + public LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public LongGauge getLongGauge(String name) { return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 22a9c2dbcc281..c4bf92401be9d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1283,6 +1283,7 @@ public Collection createComponents(PluginServices services) { clusterService, client, inferenceAuditor, + telemetryProvider.getMeterRegistry(), mlAssignmentNotifier, machineLearningExtension.get().isAnomalyDetectionEnabled(), machineLearningExtension.get().isDataFrameAnalyticsEnabled(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index a1664b7023fc0..2b3ed3f7a656c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsFeatureFlag; @@ -68,6 +69,7 @@ public final class MlInitializationService implements ClusterStateListener { ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, MlAssignmentNotifier mlAssignmentNotifier, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, @@ -87,7 +89,7 @@ public final class MlInitializationService implements ClusterStateListener { isDataFrameAnalyticsEnabled, isNlpEnabled ), - new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, isNlpEnabled), + new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled), clusterService ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java index 15f647bc76697..044556d1b30ac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java @@ -35,10 +35,15 @@ public class AdaptiveAllocationsScaler { private final KalmanFilter1d inferenceTimeEstimator; private int numberOfAllocations; + private int neededNumberOfAllocations; private Integer minNumberOfAllocations; private Integer maxNumberOfAllocations; private boolean dynamicsChanged; + private Double lastMeasuredRequestRate; + private Double lastMeasuredInferenceTime; + private Long lastMeasuredQueueSize; + AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations) { this.deploymentId = deploymentId; // A smoothing factor of 100 roughly means the last 100 measurements have an effect @@ -51,9 +56,14 @@ public class AdaptiveAllocationsScaler { requestRateEstimator = new KalmanFilter1d(deploymentId + ":rate", 100, true); inferenceTimeEstimator = new KalmanFilter1d(deploymentId + ":time", 100, false); this.numberOfAllocations = numberOfAllocations; - this.minNumberOfAllocations = null; - this.maxNumberOfAllocations = null; - this.dynamicsChanged = false; + neededNumberOfAllocations = numberOfAllocations; + minNumberOfAllocations = null; + maxNumberOfAllocations = null; + dynamicsChanged = false; + + lastMeasuredRequestRate = null; + lastMeasuredInferenceTime = null; + lastMeasuredQueueSize = null; } void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNumberOfAllocations) { @@ -62,6 +72,8 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum } void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) { + lastMeasuredQueueSize = stats.pendingCount(); + // The request rate (per second) is the request count divided by the time. // Assuming a Poisson process for the requests, the variance in the request // count equals the mean request count, and the variance in the request rate @@ -74,6 +86,7 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe double requestRateEstimate = requestRateEstimator.hasValue() ? requestRateEstimator.estimate() : requestRate; double requestRateVariance = Math.max(1.0, requestRateEstimate * timeIntervalSeconds) / Math.pow(timeIntervalSeconds, 2); requestRateEstimator.add(requestRate, requestRateVariance, false); + lastMeasuredRequestRate = requestRate; if (stats.requestCount() > 0 && Double.isNaN(stats.inferenceTime()) == false) { // The inference time distribution is unknown. For simplicity, we assume @@ -86,6 +99,9 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe double inferenceTimeEstimate = inferenceTimeEstimator.hasValue() ? inferenceTimeEstimator.estimate() : inferenceTime; double inferenceTimeVariance = Math.pow(inferenceTimeEstimate, 2) / stats.requestCount(); inferenceTimeEstimator.add(inferenceTime, inferenceTimeVariance, dynamicsChanged); + lastMeasuredInferenceTime = inferenceTime; + } else { + lastMeasuredInferenceTime = null; } this.numberOfAllocations = numberOfAllocations; @@ -104,6 +120,14 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe return requestRateUpper * inferenceTimeUpper; } + Double getRequestRateEstimate() { + return requestRateEstimator.hasValue() ? requestRateEstimator.estimate() : null; + } + + Double getInferenceTimeEstimate() { + return inferenceTimeEstimator.hasValue() ? inferenceTimeEstimator.estimate() : null; + } + Integer scale() { if (requestRateEstimator.hasValue() == false) { return null; @@ -121,6 +145,8 @@ Integer scale() { numberOfAllocations--; } + this.neededNumberOfAllocations = numberOfAllocations; + if (maxNumberOfAllocations == null) { numberOfAllocations = Math.min(numberOfAllocations, MAX_NUMBER_OF_ALLOCATIONS_SAFEGUARD); } @@ -161,4 +187,28 @@ Integer scale() { return null; } } + + public String getDeploymentId() { + return deploymentId; + } + + public long getNumberOfAllocations() { + return numberOfAllocations; + } + + public long getNeededNumberOfAllocations() { + return neededNumberOfAllocations; + } + + public Double getLastMeasuredRequestRate() { + return lastMeasuredRequestRate; + } + + public Double getLastMeasuredInferenceTime() { + return lastMeasuredInferenceTime; + } + + public Long getLastMeasuredQueueSize() { + return lastMeasuredQueueSize; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java index 30e3871ad5ad0..063ecae3726b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java @@ -19,6 +19,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -30,11 +33,15 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; /** * Periodically schedules adaptive allocations scaling. This process consists @@ -75,6 +82,108 @@ Stats sub(Stats value) { } } + private class Metrics { + + private final List metrics = new ArrayList<>(); + + Metrics() {} + + void init() { + if (metrics.isEmpty() == false) { + return; + } + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.actual_number_of_allocations.current", + "the actual number of allocations", + "", + () -> observeLong(AdaptiveAllocationsScaler::getNumberOfAllocations) + ) + ); + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.needed_number_of_allocations.current", + "the number of allocations needed according to the adaptive allocations scaler", + "", + () -> observeLong(AdaptiveAllocationsScaler::getNeededNumberOfAllocations) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.measured_request_rate.current", + "the request rate reported by the stats API", + "1/s", + () -> observeDouble(AdaptiveAllocationsScaler::getLastMeasuredRequestRate) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.estimated_request_rate.current", + "the request rate estimated by the adaptive allocations scaler", + "1/s", + () -> observeDouble(AdaptiveAllocationsScaler::getRequestRateEstimate) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.measured_inference_time.current", + "the inference time reported by the stats API", + "s", + () -> observeDouble(AdaptiveAllocationsScaler::getLastMeasuredInferenceTime) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.estimated_inference_time.current", + "the inference time estimated by the adaptive allocations scaler", + "s", + () -> observeDouble(AdaptiveAllocationsScaler::getInferenceTimeEstimate) + ) + ); + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.queue_size.current", + "the queue size reported by the stats API", + "s", + () -> observeLong(AdaptiveAllocationsScaler::getLastMeasuredQueueSize) + ) + ); + } + + Collection observeLong(Function getValue) { + List observations = new ArrayList<>(); + for (AdaptiveAllocationsScaler scaler : scalers.values()) { + Long value = getValue.apply(scaler); + if (value != null) { + observations.add(new LongWithAttributes(value, Map.of("deployment_id", scaler.getDeploymentId()))); + } + } + return observations; + } + + Collection observeDouble(Function getValue) { + List observations = new ArrayList<>(); + for (AdaptiveAllocationsScaler scaler : scalers.values()) { + Double value = getValue.apply(scaler); + if (value != null) { + observations.add(new DoubleWithAttributes(value, Map.of("deployment_id", scaler.getDeploymentId()))); + } + } + return observations; + } + + void close() { + for (AutoCloseable metric : metrics) { + try { + metric.close(); + } catch (Exception e) { + // do nothing + } + } + metrics.clear(); + } + } + /** * The time interval between the adaptive allocations triggers. */ @@ -92,6 +201,8 @@ Stats sub(Stats value) { private final ClusterService clusterService; private final Client client; private final InferenceAuditor inferenceAuditor; + private final MeterRegistry meterRegistry; + private final Metrics metrics; private final boolean isNlpEnabled; private final Map> lastInferenceStatsByDeploymentAndNode; private Long lastInferenceStatsTimestampMillis; @@ -106,9 +217,10 @@ public AdaptiveAllocationsScalerService( ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, boolean isNlpEnabled ) { - this(threadPool, clusterService, client, inferenceAuditor, isNlpEnabled, DEFAULT_TIME_INTERVAL_SECONDS); + this(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled, DEFAULT_TIME_INTERVAL_SECONDS); } // visible for testing @@ -117,6 +229,7 @@ public AdaptiveAllocationsScalerService( ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, boolean isNlpEnabled, int timeIntervalSeconds ) { @@ -124,6 +237,7 @@ public AdaptiveAllocationsScalerService( this.clusterService = clusterService; this.client = client; this.inferenceAuditor = inferenceAuditor; + this.meterRegistry = meterRegistry; this.isNlpEnabled = isNlpEnabled; this.timeIntervalSeconds = timeIntervalSeconds; @@ -131,11 +245,13 @@ public AdaptiveAllocationsScalerService( lastInferenceStatsTimestampMillis = null; lastScaleUpTimesMillis = new HashMap<>(); scalers = new HashMap<>(); + metrics = new Metrics(); busy = new AtomicBoolean(false); } public synchronized void start() { updateAutoscalers(clusterService.state()); + metrics.init(); clusterService.addListener(this); if (scalers.isEmpty() == false) { startScheduling(); @@ -144,6 +260,7 @@ public synchronized void start() { public synchronized void stop() { stopScheduling(); + metrics.close(); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 2f251e3b0aee6..a5b9597886e15 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; @@ -40,6 +41,7 @@ public class MlInitializationServiceTests extends ESTestCase { private ClusterService clusterService; private Client client; private InferenceAuditor inferenceAuditor; + private MeterRegistry meterRegistry; private MlAssignmentNotifier mlAssignmentNotifier; @Before @@ -49,6 +51,7 @@ public void setUpMocks() { clusterService = mock(ClusterService.class); client = mock(Client.class); inferenceAuditor = mock(InferenceAuditor.class); + meterRegistry = mock(MeterRegistry.class); mlAssignmentNotifier = mock(MlAssignmentNotifier.class); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); @@ -75,6 +78,7 @@ public void testInitialize() { clusterService, client, inferenceAuditor, + meterRegistry, mlAssignmentNotifier, true, true, @@ -91,6 +95,7 @@ public void testInitialize_noMasterNode() { clusterService, client, inferenceAuditor, + meterRegistry, mlAssignmentNotifier, true, true, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java index 3ad44f256dc66..4aaddc91231f3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; @@ -55,6 +56,7 @@ public class AdaptiveAllocationsScalerServiceTests extends ESTestCase { private ClusterService clusterService; private Client client; private InferenceAuditor inferenceAuditor; + private MeterRegistry meterRegistry; @Override @Before @@ -66,6 +68,7 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); client = mock(Client.class); inferenceAuditor = mock(InferenceAuditor.class); + meterRegistry = mock(MeterRegistry.class); } @Override @@ -156,6 +159,7 @@ public void test() throws IOException { clusterService, client, inferenceAuditor, + meterRegistry, true, 1 );