Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

main to 1.1 #76

Merged
merged 3 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ dependencies {
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
compile group: 'commons-io', name: 'commons-io', version: '2.3'
compile group: 'commons-io', name: 'commons-io', version: '2.7'
implementation 'io.grpc:grpc-netty-shaded:1.28.0'
implementation 'io.grpc:grpc-protobuf:1.28.0'
implementation 'io.grpc:grpc-stub:1.28.0'
Expand Down
21 changes: 3 additions & 18 deletions pa_config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,18 @@
"heap-range": [
{
"lower-bound": 0,
"upper-bound": 75,
"threshold": 15.0
},
{
"lower-bound": 76,
"upper-bound": 80,
"threshold": 12.5
"threshold": 15.0
},
{
"lower-bound": 81,
"upper-bound": 85,
"threshold": 10.0
},
{
"lower-bound": 86,
"upper-bound": 90,
"threshold": 7.5
"threshold": 10.0
},
{
"lower-bound": 91,
"upper-bound": 95,
"threshold": 5.0
},
{
"lower-bound": 96,
"upper-bound": 100,
"threshold": 2.5
"threshold": 5.0
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public Map<NodeKey, ImpactVector> impact() {
impactVector.increasesPressure(ADMISSION_CONTROL);
} else if (desiredValue < currentValue) {
impactVector.decreasesPressure(ADMISSION_CONTROL);
} else {
impactVector.noImpact(ADMISSION_CONTROL);
}
return Collections.singletonMap(esNode, impactVector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.performanceanalyzer.decisionmaker.actions.AdmissionControlAction;
import org.opensearch.performanceanalyzer.grpc.ResourceEnum;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.AdmissionControlClusterRca;
Expand All @@ -45,7 +46,7 @@ public class AdmissionControlDecider extends Decider {

private int counter = 0;
private static final String NAME = "admissionControlDecider";
private AdmissionControlClusterRca admissionControlClusterRca;
private final AdmissionControlClusterRca admissionControlClusterRca;

public AdmissionControlDecider(
long evalIntervalSeconds,
Expand Down Expand Up @@ -79,21 +80,29 @@ public Decision operate() {

private List<AdmissionControlAction> getHeapBasedActions() {
List<AdmissionControlAction> heapBasedActions = new ArrayList<>();
admissionControlClusterRca.getFlowUnits().stream()
.filter(ResourceFlowUnit::hasResourceSummary)
.flatMap(
clusterRcaFlowUnits ->
clusterRcaFlowUnits.getSummary().getHotNodeSummaryList().stream())

if (admissionControlClusterRca.getFlowUnits().isEmpty()) {
return heapBasedActions;
}

ResourceFlowUnit<HotClusterSummary> flowUnit =
admissionControlClusterRca.getFlowUnits().get(0);
if (!flowUnit.hasResourceSummary()) {
return heapBasedActions;
}

HotClusterSummary clusterSummary = flowUnit.getSummary();
clusterSummary
.getHotNodeSummaryList()
.forEach(
hotNodeSummary -> {
hotNodeSummary.getHotResourceSummaryList().stream()
nodeSummary -> {
nodeSummary.getHotResourceSummaryList().stream()
.filter(this::isHeapResource)
.map(
hotResourceSummary ->
getAction(hotNodeSummary, hotResourceSummary))
.map(resourceSummary -> getAction(nodeSummary, resourceSummary))
.filter(Objects::nonNull)
.forEach(heapBasedActions::add);
});

return heapBasedActions;
}

Expand All @@ -103,12 +112,12 @@ private boolean isHeapResource(HotResourceSummary hotResourceSummary) {

private AdmissionControlAction getAction(
HotNodeSummary hotNodeSummary, HotResourceSummary hotResourceSummary) {
double currentHeapPercent = hotResourceSummary.getValue();
double currentThreshold = hotResourceSummary.getValue();
double desiredThreshold = hotResourceSummary.getThreshold();
NodeKey esNode = new NodeKey(hotNodeSummary.getNodeID(), hotNodeSummary.getHostAddress());
AdmissionControlAction action =
AdmissionControlAction.newBuilder(esNode, REQUEST_SIZE, getAppContext(), rcaConf)
.currentValue(currentHeapPercent)
.currentValue(currentThreshold)
.desiredValue(desiredThreshold)
.build();
return action.isActionable() ? action : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public static void addMetricEntry(StringBuilder value, String metricKey, long me
private static void emitMetric(BlockingQueue<Event> q, Event entry) {
if (!q.offer(entry)) {
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.METRICS_WRITE_ERROR, entry.key, 1);
WriterMetrics.METRICS_WRITE_ERROR, "", 1);
LOG.debug("Could not enter metric {}", entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,9 @@
* "heap-range": [
* {
* "lower-bound": 0,
* "upper-bound": 75,
* "threshold": 15
* },
* {
* "lower-bound": 76,
* "upper-bound": 80,
* "threshold": 12.5
* }
* "threshold": 15.0
* },
* ...
* ]
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,13 @@ private AdmissionControlDecider buildAdmissionControlDecider(Metric heapUsed, Me
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlClusterRca.addAllUpstreams(Collections.singletonList(admissionControlRca));
admissionControlClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

AdmissionControlDecider admissionControlDecider =
new AdmissionControlDecider(
EVALUATION_INTERVAL_SECONDS, 12, admissionControlClusterRca);
EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, admissionControlClusterRca);
admissionControlDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
admissionControlDecider.addAllUpstreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,13 @@

package org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol;

import static org.opensearch.performanceanalyzer.PerformanceAnalyzerApp.RCA_VERTICES_METRICS_AGGREGATOR;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.GCType.HEAP;
import static org.opensearch.performanceanalyzer.metrics.AllMetrics.HeapDimension.MEM_TYPE;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.HEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.Resources.State.UNHEALTHY;
import static org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil.readDataFromSqlResult;
import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.HEAP_MAX_SIZE;
import static org.opensearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics.ADMISSION_CONTROL_RCA_TRIGGERED;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Field;
Expand All @@ -48,19 +42,22 @@
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.AdmissionControlByHeap;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.heap.AdmissionControlByHeapFactory;
import org.opensearch.performanceanalyzer.rca.store.rca.admissioncontrol.model.HeapMetric;
import org.opensearch.performanceanalyzer.util.range.Range;
import org.opensearch.performanceanalyzer.util.range.RangeConfiguration;
import org.opensearch.performanceanalyzer.util.range.RequestSizeHeapRangeConfiguration;

public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private static final Logger LOG = LogManager.getLogger(AdmissionControlRca.class);
private static final double BYTES_TO_MEGABYTES = Math.pow(1024, 2);
private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3);

// Global JVM Memory Pressure Metric
public static final String GLOBAL_JVMMP = "Global_JVMMP";
Expand All @@ -73,50 +70,37 @@ public class AdmissionControlRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
private final RangeConfiguration requestSizeHeapRange;
private final int rcaPeriod;
private int counter;
private double previousHeapPercent;

public <M extends Metric> AdmissionControlRca(
final int rcaPeriodInSeconds, final M heapUsedValue, final M heapMaxValue) {
super(rcaPeriodInSeconds);
this.counter = 0;
this.previousHeapPercent = 0.0;
this.rcaPeriod = rcaPeriodInSeconds;
this.heapUsedValue = heapUsedValue;
this.heapMaxValue = heapMaxValue;
this.requestSizeHeapRange = new RequestSizeHeapRangeConfiguration();
}

private <M extends Metric> double getMetric(
M metric, Field<String> field, String fieldName, String dataField) {
AtomicReference<Double> metricValue = new AtomicReference<>((double) 0);
metric.getFlowUnits().stream()
.filter(flowUnit -> !flowUnit.isEmpty() && !flowUnit.getData().isEmpty())
.mapToDouble(
flowUnit ->
readDataFromSqlResult(
flowUnit.getData(), field, fieldName, dataField))
.forEach(
metricResponse -> {
if (Double.isNaN(metricResponse)) {
LOG.debug(
"[AdmissionControl] Failed to parse metric from {}",
metric.name());
} else {
metricValue.set(metricResponse);
}
});
return metricValue.get();
private <M extends Metric> double getMetric(M metric, Field<String> field, String fieldName) {
double response = 0;
for (MetricFlowUnit flowUnit : metric.getFlowUnits()) {
if (!flowUnit.isEmpty()) {
double metricResponse =
readDataFromSqlResult(flowUnit.getData(), field, fieldName, MetricsDB.MAX);
if (!Double.isNaN(metricResponse) && metricResponse > 0) {
response = metricResponse;
}
}
}
return response;
}

private HeapMetrics getHeapMetric() {
HeapMetrics heapMetrics = new HeapMetrics();
heapMetrics.usedHeap =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
heapMetrics.maxHeap =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString(), MetricsDB.MAX)
/ BYTES_TO_MEGABYTES;
return heapMetrics;
private HeapMetric getHeapMetric() {
double usedHeapInGb =
getMetric(heapUsedValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
double maxHeapInGb =
getMetric(heapMaxValue, MEM_TYPE.getField(), HEAP.toString()) / BYTES_TO_GIGABYTES;
return new HeapMetric(usedHeapInGb, maxHeapInGb);
}

/**
Expand All @@ -138,7 +122,7 @@ public void readRcaConf(RcaConf conf) {
}

@Override
public ResourceFlowUnit operate() {
public ResourceFlowUnit<HotNodeSummary> operate() {
long currentTimeMillis = System.currentTimeMillis();

counter++;
Expand All @@ -152,50 +136,19 @@ public ResourceFlowUnit operate() {
new HotNodeSummary(
instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());

HeapMetrics heapMetrics = getHeapMetric();
if (heapMetrics.usedHeap == 0 || heapMetrics.maxHeap == 0) {
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

double currentHeapPercent = (heapMetrics.usedHeap / heapMetrics.maxHeap) * 100;

// If we observe heap percent range change then we tune request-size controller threshold
// by marking resource as unhealthy and setting desired value as configured
if (requestSizeHeapRange.hasRangeChanged(previousHeapPercent, currentHeapPercent)) {
double desiredThreshold = getHeapBasedThreshold(currentHeapPercent);
if (desiredThreshold == 0) {
// AdmissionControl rejects all requests if threshold is set to 0, thus ignoring
ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}
LOG.debug(
"[AdmissionControl] Observed range change. previousHeapPercent={} currentHeapPercent={} desiredThreshold={}",
previousHeapPercent,
currentHeapPercent,
desiredThreshold);

previousHeapPercent = currentHeapPercent;

HotResourceSummary resourceSummary =
new HotResourceSummary(HEAP_MAX_SIZE, desiredThreshold, currentHeapPercent, 0);
nodeSummary.appendNestedSummary(resourceSummary);

RCA_VERTICES_METRICS_AGGREGATOR.updateStat(
ADMISSION_CONTROL_RCA_TRIGGERED, instanceDetails.getInstanceId().toString(), 1);

ResourceContext context = new ResourceContext(UNHEALTHY);
HeapMetric heapMetric = getHeapMetric();
if (!heapMetric.hasValues()) {
return new ResourceFlowUnit<>(
currentTimeMillis, context, nodeSummary, !instanceDetails.getIsMaster());
currentTimeMillis,
new ResourceContext(HEALTHY),
nodeSummary,
!instanceDetails.getIsMaster());
}

ResourceContext context = new ResourceContext(HEALTHY);
return new ResourceFlowUnit<>(currentTimeMillis, context, nodeSummary);
}

private double getHeapBasedThreshold(double currentHeapPercent) {
Range range = requestSizeHeapRange.getRange(currentHeapPercent);
return Objects.isNull(range) ? 0 : range.getThreshold();
AdmissionControlByHeap admissionControlByHeap =
AdmissionControlByHeapFactory.getByMaxHeap(heapMetric.getMaxHeap());
admissionControlByHeap.init(instanceDetails, requestSizeHeapRange);
return admissionControlByHeap.generateFlowUnits(heapMetric);
}

public RangeConfiguration getRequestSizeHeapRange() {
Expand All @@ -206,16 +159,11 @@ public RangeConfiguration getRequestSizeHeapRange() {
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
final List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

private static class HeapMetrics {
private double usedHeap;
private double maxHeap;
}
}
Loading