Skip to content

Commit

Permalink
AdmissionControl Split RCA
Browse files Browse the repository at this point in the history
Signed-off-by: Mital Awachat <awachatm@amazon.com>
  • Loading branch information
mitalawachat committed Sep 16, 2021
1 parent cdcae41 commit ef38fe2
Show file tree
Hide file tree
Showing 17 changed files with 515 additions and 164 deletions.
21 changes: 3 additions & 18 deletions pa_config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,18 @@
"heap-range": [
{
"lower-bound": 0,
"upper-bound": 75,
"threshold": 15.0
},
{
"lower-bound": 76,
"upper-bound": 80,
"threshold": 12.5
"threshold": 15.0
},
{
"lower-bound": 81,
"upper-bound": 85,
"threshold": 10.0
},
{
"lower-bound": 86,
"upper-bound": 90,
"threshold": 7.5
"threshold": 10.0
},
{
"lower-bound": 91,
"upper-bound": 95,
"threshold": 5.0
},
{
"lower-bound": 96,
"upper-bound": 100,
"threshold": 2.5
"threshold": 5.0
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public Map<NodeKey, ImpactVector> impact() {
impactVector.increasesPressure(ADMISSION_CONTROL);
} else if (desiredValue < currentValue) {
impactVector.decreasesPressure(ADMISSION_CONTROL);
} else {
impactVector.noImpact(ADMISSION_CONTROL);
}
return Collections.singletonMap(esNode, impactVector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.performanceanalyzer.decisionmaker.actions.AdmissionControlAction;
import org.opensearch.performanceanalyzer.grpc.ResourceEnum;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.AdmissionControlClusterRca;
Expand All @@ -45,7 +46,7 @@ public class AdmissionControlDecider extends Decider {

private int counter = 0;
private static final String NAME = "admissionControlDecider";
private AdmissionControlClusterRca admissionControlClusterRca;
private final AdmissionControlClusterRca admissionControlClusterRca;

public AdmissionControlDecider(
long evalIntervalSeconds,
Expand Down Expand Up @@ -79,21 +80,29 @@ public Decision operate() {

private List<AdmissionControlAction> getHeapBasedActions() {
List<AdmissionControlAction> heapBasedActions = new ArrayList<>();
admissionControlClusterRca.getFlowUnits().stream()
.filter(ResourceFlowUnit::hasResourceSummary)
.flatMap(
clusterRcaFlowUnits ->
clusterRcaFlowUnits.getSummary().getHotNodeSummaryList().stream())

if (admissionControlClusterRca.getFlowUnits().isEmpty()) {
return heapBasedActions;
}

ResourceFlowUnit<HotClusterSummary> flowUnit =
admissionControlClusterRca.getFlowUnits().get(0);
if (!flowUnit.hasResourceSummary()) {
return heapBasedActions;
}

HotClusterSummary clusterSummary = flowUnit.getSummary();
clusterSummary
.getHotNodeSummaryList()
.forEach(
hotNodeSummary -> {
hotNodeSummary.getHotResourceSummaryList().stream()
nodeSummary -> {
nodeSummary.getHotResourceSummaryList().stream()
.filter(this::isHeapResource)
.map(
hotResourceSummary ->
getAction(hotNodeSummary, hotResourceSummary))
.map(resourceSummary -> getAction(nodeSummary, resourceSummary))
.filter(Objects::nonNull)
.forEach(heapBasedActions::add);
});

return heapBasedActions;
}

Expand All @@ -103,12 +112,12 @@ private boolean isHeapResource(HotResourceSummary hotResourceSummary) {

private AdmissionControlAction getAction(
HotNodeSummary hotNodeSummary, HotResourceSummary hotResourceSummary) {
double currentHeapPercent = hotResourceSummary.getValue();
double currentThreshold = hotResourceSummary.getValue();
double desiredThreshold = hotResourceSummary.getThreshold();
NodeKey esNode = new NodeKey(hotNodeSummary.getNodeID(), hotNodeSummary.getHostAddress());
AdmissionControlAction action =
AdmissionControlAction.newBuilder(esNode, REQUEST_SIZE, getAppContext(), rcaConf)
.currentValue(currentHeapPercent)
.currentValue(currentThreshold)
.desiredValue(desiredThreshold)
.build();
return action.isActionable() ? action : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,9 @@
* "heap-range": [
* {
* "lower-bound": 0,
* "upper-bound": 75,
* "threshold": 15
* },
* {
* "lower-bound": 76,
* "upper-bound": 80,
* "threshold": 12.5
* }
* "threshold": 15.0
* },
* ...
* ]
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,13 @@ private AdmissionControlDecider buildAdmissionControlDecider(Metric heapUsed, Me
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlClusterRca.addAllUpstreams(Collections.singletonList(admissionControlRca));
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

AdmissionControlDecider admissionControlDecider =
new AdmissionControlDecider(
EVALUATION_INTERVAL_SECONDS, 12, admissionControlClusterRca);
EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, admissionControlClusterRca);
admissionControlDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlDecider.addAllUpstreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,13 @@

package org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol;

import static org.opensearch.performanceanalyzer.PerformanceAnalyzerApp.RCA_VERTICES_METRICS_AGGREGATOR;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.GCType.HEAP;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.HeapDimension.MEM_TYPE;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.HEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.UNHEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil.readDataFromSqlResult;
import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.HEAP_MAX_SIZE;
import static org.opensearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics.ADMISSION_CONTROL_RCA_TRIGGERED;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Field;
Expand All @@ -48,19 +42,22 @@
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.HeapRca;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.HeapRcaFactory;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.model.HeapMetric;
import org.opensearch.performanceanalyzer.util.range.Range;
import org.opensearch.performanceanalyzer.util.range.RangeConfiguration;
import org.opensearch.performanceanalyzer.util.range.RequestSizeHeapRangeConfiguration;

public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(AdmissionControlRca.class);
private static final double BYTES_TO_MEGABYTES = Math.pow(1024, 2);
private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3);

// Global JVM Memory Pressure Metric
public static final String GLOBAL_JVMMP = "Global_JVMMP";
Expand All @@ -73,50 +70,37 @@ public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private final RangeConfiguration requestSizeHeapRange;
private final int rcaPeriod;
private int counter;
private double previousHeapPercent;

public <M extends Metric> AdmissionControlRca(
final int rcaPeriodInSeconds, final M heapUsedValue, final M heapMaxValue) {
super(rcaPeriodInSeconds);
this.counter = 0;
this.previousHeapPercent = 0.0;
this.rcaPeriod = rcaPeriodInSeconds;
this.heapUsedValue = heapUsedValue;
this.heapMaxValue = heapMaxValue;
this.requestSizeHeapRange = new RequestSizeHeapRangeConfiguration();
}

private <M extends Metric> double getMetric(
M metric, Field<String> field, String fieldName, String dataField) {
AtomicReference<Double> metricValue = new AtomicReference<>((double) 0);
metric.getFlowUnits().stream()
.filter(flowUnit -> !flowUnit.isEmpty() && !flowUnit.getData().isEmpty())
.mapToDouble(
flowUnit ->
readDataFromSqlResult(
flowUnit.getData(), field, fieldName, dataField))
.forEach(
metricResponse -> {
if (Double.isNaN(metricResponse)) {
LOG.debug(
"[AdmissionControl] Failed to parse metric from {}",
metric.name());
} else {
metricValue.set(metricResponse);
}
});
return metricValue.get();
private <M extends Metric> double getMetric(M metric, Field<String> field, String fieldName) {
double response = 0;
for (MetricFlowUnit flowUnit : metric.getFlowUnits()) {
if (!flowUnit.isEmpty()) {
double metricResponse =
readDataFromSqlResult(flowUnit.getData(), field, fieldName, MetricsDB.MAX);
if (!Double.isNaN(metricResponse) && metricResponse > 0) {
response = metricResponse;
}
}
}
return response;
}

private HeapMetrics getHeapMetric() {
HeapMetrics heapMetrics = new HeapMetrics();
heapMetrics.usedHeap =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
heapMetrics.maxHeap =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
return heapMetrics;
private HeapMetric getHeapMetric() {
double usedHeap =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
double maxHeap =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
return new HeapMetric(usedHeap, maxHeap);
}

/**
Expand All @@ -138,7 +122,7 @@ public void readRcaConf(RcaConf conf) {
}

@Override
public ResourceFlowUnit operate() {
public ResourceFlowUnit<HotNodeSummary> operate() {
long currentTimeMillis = System.currentTimeMillis();

counter++;
Expand All @@ -152,50 +136,18 @@ public ResourceFlowUnit operate() {
new HotNodeSummary(
instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());

HeapMetrics heapMetrics = getHeapMetric();
if (heapMetrics.usedHeap == 0 || heapMetrics.maxHeap == 0) {
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

double currentHeapPercent = (heapMetrics.usedHeap / heapMetrics.maxHeap) * 100;

// If we observe heap percent range change then we tune request-size controller threshold
// by marking resource as unhealthy and setting desired value as configured
if (requestSizeHeapRange.hasRangeChanged(previousHeapPercent, currentHeapPercent)) {
double desiredThreshold = getHeapBasedThreshold(currentHeapPercent);
if (desiredThreshold == 0) {
// AdmissionControl rejects all requests if threshold is set to 0, thus ignoring
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}
LOG.debug(
"[AdmissionControl] Observed range change. previousHeapPercent={} currentHeapPercent={} desiredThreshold={}",
previousHeapPercent,
currentHeapPercent,
desiredThreshold);

previousHeapPercent = currentHeapPercent;

HotResourceSummary resourceSummary =
new HotResourceSummary(HEAP_MAX_SIZE, desiredThreshold, currentHeapPercent, 0);
nodeSummary.appendNestedSummary(resourceSummary);

RCA_VERTICES_METRICS_AGGREGATOR.updateStat(
ADMISSION_CONTROL_RCA_TRIGGERED, instanceDetails.getInstanceId().toString(), 1);

ResourceContext context = new ResourceContext(UNHEALTHY);
HeapMetric heapMetric = getHeapMetric();
if (!heapMetric.hasValues()) {
return new ResourceFlowUnit<>(
currentTimeMillis, context, nodeSummary, !instanceDetails.getIsMaster());
currentTimeMillis,
new ResourceContext(HEALTHY),
nodeSummary,
!instanceDetails.getIsMaster());
}

ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

private double getHeapBasedThreshold(double currentHeapPercent) {
Range range = requestSizeHeapRange.getRange(currentHeapPercent);
return Objects.isNull(range) ? 0 : range.getThreshold();
HeapRca heapRca = HeapRcaFactory.getHeapRca(heapMetric.getMaxHeap());
heapRca.init(instanceDetails, requestSizeHeapRange);
return heapRca.operate(heapMetric);
}

public RangeConfiguration getRequestSizeHeapRange() {
Expand All @@ -206,16 +158,11 @@ public RangeConfiguration getRequestSizeHeapRange() {
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
final List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

private static class HeapMetrics {
private double usedHeap;
private double maxHeap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap;


import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.model.HeapMetric;
import org.opensearch.performanceanalyzer.util.range.RangeConfiguration;

/** Interface that can be implemented to calculate ResourceFlowUnit for various heap size */
public interface HeapRca {
void init(InstanceDetails instanceDetails, RangeConfiguration rangeConfiguration);

ResourceFlowUnit<HotNodeSummary> operate(HeapMetric heapMetric);
}
Loading

0 comments on commit ef38fe2

Please sign in to comment.