Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AdmissionControl Split RCA #69

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions pa_config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,18 @@
"heap-range": [
{
"lower-bound": 0,
"upper-bound": 75,
"threshold": 15.0
},
{
"lower-bound": 76,
"upper-bound": 80,
"threshold": 12.5
"threshold": 15.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some comments or more information in the PR's description on how these values are arrived at or more appropriate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated description, and also added in code comments.

},
{
"lower-bound": 81,
"upper-bound": 85,
"threshold": 10.0
},
{
"lower-bound": 86,
"upper-bound": 90,
"threshold": 7.5
"threshold": 10.0
},
{
"lower-bound": 91,
"upper-bound": 95,
"threshold": 5.0
},
{
"lower-bound": 96,
"upper-bound": 100,
"threshold": 2.5
"threshold": 5.0
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public Map<NodeKey, ImpactVector> impact() {
impactVector.increasesPressure(ADMISSION_CONTROL);
} else if (desiredValue < currentValue) {
impactVector.decreasesPressure(ADMISSION_CONTROL);
} else {
impactVector.noImpact(ADMISSION_CONTROL);
}
return Collections.singletonMap(esNode, impactVector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.performanceanalyzer.decisionmaker.actions.AdmissionControlAction;
import org.opensearch.performanceanalyzer.grpc.ResourceEnum;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.AdmissionControlClusterRca;
Expand All @@ -45,7 +46,7 @@ public class AdmissionControlDecider extends Decider {

private int counter = 0;
private static final String NAME = "admissionControlDecider";
private AdmissionControlClusterRca admissionControlClusterRca;
private final AdmissionControlClusterRca admissionControlClusterRca;

public AdmissionControlDecider(
long evalIntervalSeconds,
Expand Down Expand Up @@ -79,21 +80,29 @@ public Decision operate() {

private List<AdmissionControlAction> getHeapBasedActions() {
List<AdmissionControlAction> heapBasedActions = new ArrayList<>();
admissionControlClusterRca.getFlowUnits().stream()
.filter(ResourceFlowUnit::hasResourceSummary)
.flatMap(
clusterRcaFlowUnits ->
clusterRcaFlowUnits.getSummary().getHotNodeSummaryList().stream())

if (admissionControlClusterRca.getFlowUnits().isEmpty()) {
return heapBasedActions;
}

ResourceFlowUnit<HotClusterSummary> flowUnit =
admissionControlClusterRca.getFlowUnits().get(0);
if (!flowUnit.hasResourceSummary()) {
return heapBasedActions;
}

HotClusterSummary clusterSummary = flowUnit.getSummary();
clusterSummary
.getHotNodeSummaryList()
.forEach(
hotNodeSummary -> {
hotNodeSummary.getHotResourceSummaryList().stream()
nodeSummary -> {
nodeSummary.getHotResourceSummaryList().stream()
.filter(this::isHeapResource)
.map(
hotResourceSummary ->
getAction(hotNodeSummary, hotResourceSummary))
.map(resourceSummary -> getAction(nodeSummary, resourceSummary))
.filter(Objects::nonNull)
.forEach(heapBasedActions::add);
});

return heapBasedActions;
}

Expand All @@ -103,12 +112,12 @@ private boolean isHeapResource(HotResourceSummary hotResourceSummary) {

private AdmissionControlAction getAction(
HotNodeSummary hotNodeSummary, HotResourceSummary hotResourceSummary) {
double currentHeapPercent = hotResourceSummary.getValue();
double currentThreshold = hotResourceSummary.getValue();
double desiredThreshold = hotResourceSummary.getThreshold();
NodeKey esNode = new NodeKey(hotNodeSummary.getNodeID(), hotNodeSummary.getHostAddress());
AdmissionControlAction action =
AdmissionControlAction.newBuilder(esNode, REQUEST_SIZE, getAppContext(), rcaConf)
.currentValue(currentHeapPercent)
.currentValue(currentThreshold)
.desiredValue(desiredThreshold)
.build();
return action.isActionable() ? action : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,9 @@
* "heap-range": [
* {
* "lower-bound": 0,
* "upper-bound": 75,
* "threshold": 15
* },
* {
* "lower-bound": 76,
* "upper-bound": 80,
* "threshold": 12.5
* }
* "threshold": 15.0
* },
* ...
* ]
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,13 @@ private AdmissionControlDecider buildAdmissionControlDecider(Metric heapUsed, Me
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlClusterRca.addAllUpstreams(Collections.singletonList(admissionControlRca));
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

AdmissionControlDecider admissionControlDecider =
new AdmissionControlDecider(
EVALUATION_INTERVAL_SECONDS, 12, admissionControlClusterRca);
EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, admissionControlClusterRca);
admissionControlDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlDecider.addAllUpstreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,13 @@

package org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol;

import static org.opensearch.performanceanalyzer.PerformanceAnalyzerApp.RCA_VERTICES_METRICS_AGGREGATOR;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.GCType.HEAP;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.HeapDimension.MEM_TYPE;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.HEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.UNHEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil.readDataFromSqlResult;
import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.HEAP_MAX_SIZE;
import static org.opensearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics.ADMISSION_CONTROL_RCA_TRIGGERED;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Field;
Expand All @@ -48,19 +42,22 @@
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.AdmissionControlByHeap;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.AdmissionControlByHeapFactory;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.model.HeapMetric;
import org.opensearch.performanceanalyzer.util.range.Range;
import org.opensearch.performanceanalyzer.util.range.RangeConfiguration;
import org.opensearch.performanceanalyzer.util.range.RequestSizeHeapRangeConfiguration;

public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(AdmissionControlRca.class);
private static final double BYTES_TO_MEGABYTES = Math.pow(1024, 2);
private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3);

// Global JVM Memory Pressure Metric
public static final String GLOBAL_JVMMP = "Global_JVMMP";
Expand All @@ -73,50 +70,37 @@ public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private final RangeConfiguration requestSizeHeapRange;
private final int rcaPeriod;
private int counter;
private double previousHeapPercent;

public <M extends Metric> AdmissionControlRca(
final int rcaPeriodInSeconds, final M heapUsedValue, final M heapMaxValue) {
super(rcaPeriodInSeconds);
this.counter = 0;
this.previousHeapPercent = 0.0;
this.rcaPeriod = rcaPeriodInSeconds;
this.heapUsedValue = heapUsedValue;
this.heapMaxValue = heapMaxValue;
this.requestSizeHeapRange = new RequestSizeHeapRangeConfiguration();
}

private <M extends Metric> double getMetric(
M metric, Field<String> field, String fieldName, String dataField) {
AtomicReference<Double> metricValue = new AtomicReference<>((double) 0);
metric.getFlowUnits().stream()
.filter(flowUnit -> !flowUnit.isEmpty() && !flowUnit.getData().isEmpty())
.mapToDouble(
flowUnit ->
readDataFromSqlResult(
flowUnit.getData(), field, fieldName, dataField))
.forEach(
metricResponse -> {
if (Double.isNaN(metricResponse)) {
LOG.debug(
"[AdmissionControl] Failed to parse metric from {}",
metric.name());
} else {
metricValue.set(metricResponse);
}
});
return metricValue.get();
private <M extends Metric> double getMetric(M metric, Field<String> field, String fieldName) {
double response = 0;
for (MetricFlowUnit flowUnit : metric.getFlowUnits()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run as ./gradlew build as it will fix spotless bugs if any.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (!flowUnit.isEmpty()) {
double metricResponse =
readDataFromSqlResult(flowUnit.getData(), field, fieldName, MetricsDB.MAX);
if (!Double.isNaN(metricResponse) && metricResponse > 0) {
response = metricResponse;
}
}
}
return response;
}

private HeapMetrics getHeapMetric() {
HeapMetrics heapMetrics = new HeapMetrics();
heapMetrics.usedHeap =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
heapMetrics.maxHeap =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
return heapMetrics;
private HeapMetric getHeapMetric() {
double usedHeapInGb =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
double maxHeapInGb =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
return new HeapMetric(usedHeapInGb, maxHeapInGb);
}

/**
Expand All @@ -138,7 +122,7 @@ public void readRcaConf(RcaConf conf) {
}

@Override
public ResourceFlowUnit operate() {
public ResourceFlowUnit<HotNodeSummary> operate() {
long currentTimeMillis = System.currentTimeMillis();

counter++;
Expand All @@ -152,50 +136,19 @@ public ResourceFlowUnit operate() {
new HotNodeSummary(
instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());

HeapMetrics heapMetrics = getHeapMetric();
if (heapMetrics.usedHeap == 0 || heapMetrics.maxHeap == 0) {
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

double currentHeapPercent = (heapMetrics.usedHeap / heapMetrics.maxHeap) * 100;

// If we observe heap percent range change then we tune request-size controller threshold
// by marking resource as unhealthy and setting desired value as configured
if (requestSizeHeapRange.hasRangeChanged(previousHeapPercent, currentHeapPercent)) {
double desiredThreshold = getHeapBasedThreshold(currentHeapPercent);
if (desiredThreshold == 0) {
// AdmissionControl rejects all requests if threshold is set to 0, thus ignoring
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}
LOG.debug(
"[AdmissionControl] Observed range change. previousHeapPercent={} currentHeapPercent={} desiredThreshold={}",
previousHeapPercent,
currentHeapPercent,
desiredThreshold);

previousHeapPercent = currentHeapPercent;

HotResourceSummary resourceSummary =
new HotResourceSummary(HEAP_MAX_SIZE, desiredThreshold, currentHeapPercent, 0);
nodeSummary.appendNestedSummary(resourceSummary);

RCA_VERTICES_METRICS_AGGREGATOR.updateStat(
ADMISSION_CONTROL_RCA_TRIGGERED, instanceDetails.getInstanceId().toString(), 1);

ResourceContext context = new ResourceContext(UNHEALTHY);
HeapMetric heapMetric = getHeapMetric();
if (!heapMetric.hasValues()) {
return new ResourceFlowUnit<>(
currentTimeMillis, context, nodeSummary, !instanceDetails.getIsMaster());
currentTimeMillis,
new ResourceContext(HEALTHY),
nodeSummary,
!instanceDetails.getIsMaster());
}

ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

private double getHeapBasedThreshold(double currentHeapPercent) {
Range range = requestSizeHeapRange.getRange(currentHeapPercent);
return Objects.isNull(range) ? 0 : range.getThreshold();
AdmissionControlByHeap admissionControlByHeap =
AdmissionControlByHeapFactory.getByMaxHeap(heapMetric.getMaxHeap());
admissionControlByHeap.init(instanceDetails, requestSizeHeapRange);
return admissionControlByHeap.generateFlowUnits(heapMetric);
}

public RangeConfiguration getRequestSizeHeapRange() {
Expand All @@ -206,16 +159,11 @@ public RangeConfiguration getRequestSizeHeapRange() {
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
final List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

private static class HeapMetrics {
private double usedHeap;
private double maxHeap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap;


import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.model.HeapMetric;
import org.opensearch.performanceanalyzer.util.range.RangeConfiguration;

/** Interface that can be implemented to calculate ResourceFlowUnit for various heap size */
public interface AdmissionControlByHeap {
void init(InstanceDetails instanceDetails, RangeConfiguration rangeConfiguration);

ResourceFlowUnit<HotNodeSummary> generateFlowUnits(HeapMetric heapMetric);
}
Loading