Skip to content

Commit

Permalink
Enables/disables thread contention monitoring based on setting
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Mar 22, 2022
1 parent bbc6813 commit 8f914a4
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 6 deletions.
1 change: 1 addition & 0 deletions null/data/batch_metrics_enabled.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
true
1 change: 1 addition & 0 deletions null/data/thread_contention_monitoring_enabled.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
true
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.performanceanalyzer.rca.samplers.BatchMetricsEnabledSampler;
import org.opensearch.performanceanalyzer.rca.samplers.MetricsDBFileSampler;
import org.opensearch.performanceanalyzer.rca.samplers.RcaStateSamplers;
import org.opensearch.performanceanalyzer.rca.samplers.ThreadContentionMonitoringEnabledSampler;
import org.opensearch.performanceanalyzer.rca.stats.RcaStatsReporter;
import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator;
import org.opensearch.performanceanalyzer.rca.stats.emitters.ISampler;
Expand Down Expand Up @@ -364,6 +365,7 @@ public static List<ISampler> getAllSamplers(final AppContext appContext) {
allSamplers.add(RcaStateSamplers.getRcaEnabledSampler(appContext));
allSamplers.add(new BatchMetricsEnabledSampler(appContext));
allSamplers.add(new MetricsDBFileSampler(appContext));
allSamplers.add(new ThreadContentionMonitoringEnabledSampler(appContext));

return allSamplers;
}
Expand All @@ -373,6 +375,7 @@ private static MeasurementSet[] getPeriodicMeasurementSets() {
measurementSets.addAll(Arrays.asList(JvmMetrics.values()));
measurementSets.add(RcaRuntimeMetrics.RCA_ENABLED);
measurementSets.add(ReaderMetrics.BATCH_METRICS_ENABLED);
measurementSets.add(ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED);
measurementSets.add(ReaderMetrics.METRICSDB_NUM_FILES);
measurementSets.add(ReaderMetrics.METRICSDB_SIZE_FILES);
measurementSets.add(ReaderMetrics.METRICSDB_NUM_UNCOMPRESSED_FILES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.performanceanalyzer.core.Util;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;
import sun.tools.attach.HotSpotVirtualMachine;

/** Traverses and prints the stack traces for all Java threads in the remote VM */
Expand All @@ -73,12 +74,6 @@ public class ThreadList {

private static Lock vmAttachLock = new ReentrantLock();

static {
if (threadBean.isThreadContentionMonitoringSupported()) {
threadBean.setThreadContentionMonitoringEnabled(true);
}
}

public static class ThreadState {
public long javaTid;
public long nativeTid;
Expand Down Expand Up @@ -146,6 +141,13 @@ public String toString() {
* @return A hashmap of threadId to threadState.
*/
public static Map<Long, ThreadState> getNativeTidMap() {
if (threadBean.isThreadContentionMonitoringSupported()) {
if (ReaderMetricsProcessor.getInstance().getThreadContentionMonitoringEnabled()) {
threadBean.setThreadContentionMonitoringEnabled(true);
} else {
threadBean.setThreadContentionMonitoringEnabled(false);
}
}
if (vmAttachLock.tryLock()) {
try {
// Thread dumps are expensive and therefore we make sure that at least
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public enum ReaderMetrics implements MeasurementSet {
"millis",
Arrays.asList(Statistics.MAX, Statistics.MEAN, Statistics.SUM)),

/** Whether or not thread contention monitoring is enabled (0 for enabled, 1 for disabled). */
THREAD_CONTENTION_MONITORING_ENABLED(
"ThreadContentionMonitoringEnabled", "count", Statistics.SAMPLE),

/** Amount of time taken to emit Shard State metrics. */
SHARD_STATE_EMITTER_EXECUTION_TIME(
"ShardStateEmitterExecutionTime",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.performanceanalyzer.rca.samplers;


import java.util.Objects;
import org.opensearch.performanceanalyzer.AppContext;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator;
import org.opensearch.performanceanalyzer.rca.stats.emitters.ISampler;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;

public class ThreadContentionMonitoringEnabledSampler implements ISampler {
private final AppContext appContext;

public ThreadContentionMonitoringEnabledSampler(final AppContext appContext) {
Objects.requireNonNull(appContext);
this.appContext = appContext;
}

@Override
public void sample(SampleAggregator sampleCollector) {
sampleCollector.updateStat(
ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED,
"",
isThreadContentionMonitoringEnabled() ? 1 : 0);
}

boolean isThreadContentionMonitoringEnabled() {
return ReaderMetricsProcessor.getInstance().getThreadContentionMonitoringEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ public class ReaderMetricsProcessor implements Runnable {
private final ConfigOverridesApplier configOverridesApplier;

public static final String BATCH_METRICS_ENABLED_CONF_FILE = "batch_metrics_enabled.conf";
public static final String THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE =
"thread_contention_monitoring_enabled.conf";
private boolean batchMetricsEnabled;
public static final boolean defaultBatchMetricsEnabled = false;
private boolean threadContentionMonitoringEnabled;
public static final boolean defaultThreadContentionMonitoringEnabled = false;
// This needs to be concurrent since it may be concurrently accessed by the metrics processor
// thread and the query handler thread.
private ConcurrentSkipListSet<Long> batchMetricsDBSet;
Expand Down Expand Up @@ -156,6 +160,7 @@ public ReaderMetricsProcessor(
batchMetricsDBSet = new ConcurrentSkipListSet<>();
readBatchMetricsEnabledFromConf();
restoreBatchMetricsState();
threadContentionMonitoringEnabled = defaultBatchMetricsEnabled;
}

@Override
Expand Down Expand Up @@ -1011,6 +1016,34 @@ public boolean getBatchMetricsEnabled() {
return batchMetricsEnabled;
}

private void readThreadContentionMonitoringEnabledFromConf() {
Path filePath = Paths.get(Util.DATA_DIR, THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE);

Util.invokePrivileged(
() -> {
try (Scanner sc = new Scanner(filePath)) {
String nextLine = sc.nextLine();
boolean oldValue = threadContentionMonitoringEnabled;
boolean newValue = Boolean.parseBoolean(nextLine);
if (oldValue != newValue) {
threadContentionMonitoringEnabled = newValue;
LOG.info(
"Thread contention monitoring enabled changed from {} to {}",
oldValue,
newValue);
}
} catch (IOException e) {
LOG.error("Error reading file '{}': {}", filePath.toString(), e);
threadContentionMonitoringEnabled =
defaultThreadContentionMonitoringEnabled;
}
});
}

public boolean getThreadContentionMonitoringEnabled() {
return threadContentionMonitoringEnabled;
}

/**
* An example value is this: current_time:1566413987194 StartTime:1566413987194 ItemCount:359
* IndexName:nyc_taxis ShardID:25 Primary:true Each pair is separated by new line and the key
Expand Down Expand Up @@ -1089,4 +1122,9 @@ NavigableMap<Long, MetricsDB> getMetricsDBMap() {
public void readBatchMetricsEnabledFromConfShim() {
readBatchMetricsEnabledFromConf();
}

@VisibleForTesting
public void readThreadContentionMonitoringEnabledFromConfShim() {
readThreadContentionMonitoringEnabledFromConf();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.performanceanalyzer.rca.samplers;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.performanceanalyzer.AppContext;
import org.opensearch.performanceanalyzer.core.Util;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;
import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator;
import org.opensearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import org.opensearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper;
import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor;

public class ThreadContentionMonitoringEnabledSamplerTest {
private static Path threadContentionMonitoringEnabledConfFile;
private static String rootLocation;
private static ReaderMetricsProcessor mp;
private static AppContext appContext;
private static ThreadContentionMonitoringEnabledSampler uut;

@Mock private SampleAggregator sampleAggregator;

@BeforeClass
public static void setUpClass() throws Exception {
Files.createDirectories(Paths.get(Util.DATA_DIR));
threadContentionMonitoringEnabledConfFile =
Paths.get(
Util.DATA_DIR,
ReaderMetricsProcessor.THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE);
Files.deleteIfExists(threadContentionMonitoringEnabledConfFile);

rootLocation = "build/resources/test/reader/";
mp = new ReaderMetricsProcessor(rootLocation);
ReaderMetricsProcessor.setCurrentInstance(mp);

appContext = new AppContext();
uut = new ThreadContentionMonitoringEnabledSampler(appContext);
}

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

private void writeThreadContentionMonitoringEnabled(boolean enabled) throws IOException {
Files.write(
threadContentionMonitoringEnabledConfFile, Boolean.toString(enabled).getBytes());
}

private void clearThreadContentionMonitoringEnabled() throws IOException {
Files.deleteIfExists(threadContentionMonitoringEnabledConfFile);
}

@Test
public void testIsThreadContentionMonitoringEnabled_notMaster() {
appContext.setClusterDetailsEventProcessor(null);
assertFalse(uut.isThreadContentionMonitoringEnabled());
}

@Test
public void testIsThreadContentionMonitoringEnabled() throws IOException {
ClusterDetailsEventProcessor clusterDetailsEventProcessor =
new ClusterDetailsEventProcessor();
ClusterDetailsEventProcessor.NodeDetails details =
ClusterDetailsEventProcessorTestHelper.newNodeDetails("nodex", "127.0.0.1", true);
clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor);

// No thread contention monitoring enabled file
clearThreadContentionMonitoringEnabled();
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertFalse(uut.isThreadContentionMonitoringEnabled());

// thread contention monitoring disabled
writeThreadContentionMonitoringEnabled(false);
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertFalse(uut.isThreadContentionMonitoringEnabled());

// thread contention monitoring disabled
writeThreadContentionMonitoringEnabled(true);
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertTrue(uut.isThreadContentionMonitoringEnabled());
}

@Test
public void testSample() {
ClusterDetailsEventProcessor clusterDetailsEventProcessor =
new ClusterDetailsEventProcessor();
ClusterDetailsEventProcessor.NodeDetails details =
ClusterDetailsEventProcessorTestHelper.newNodeDetails("nodex", "127.0.0.1", true);
clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor);

uut.sample(sampleAggregator);
verify(sampleAggregator, times(1))
.updateStat(
ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED,
"",
mp.getThreadContentionMonitoringEnabled() ? 1 : 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor.BATCH_METRICS_ENABLED_CONF_FILE;
import static org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor.THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE;

import java.io.File;
import java.io.FilenameFilter;
Expand Down Expand Up @@ -409,6 +410,45 @@ public void testReadBatchMetricsEnabledFromConf() throws Exception {
mp.getBatchMetricsEnabled() == ReaderMetricsProcessor.defaultBatchMetricsEnabled);
}

@Test
public void testReadThreadContentionMonitoringEnabledFromConf() throws Exception {
Files.createDirectories(Paths.get(Util.DATA_DIR));
Path threadContentionMonitoringEnabledConfFile =
Paths.get(Util.DATA_DIR, THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE);
Files.deleteIfExists(threadContentionMonitoringEnabledConfFile);
ReaderMetricsProcessor mp = new ReaderMetricsProcessor(rootLocation);
ReaderMetricsProcessor.setCurrentInstance(mp);

// Test default
assertTrue(
mp.getThreadContentionMonitoringEnabled()
== ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled);

// Test disabled
Files.write(threadContentionMonitoringEnabledConfFile, Boolean.toString(false).getBytes());
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertFalse(mp.getThreadContentionMonitoringEnabled());

// Test reverts back to default when file is deleted
Files.delete(threadContentionMonitoringEnabledConfFile);
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertTrue(
mp.getThreadContentionMonitoringEnabled()
== ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled);

// Test enabled
Files.write(threadContentionMonitoringEnabledConfFile, Boolean.toString(true).getBytes());
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertTrue(mp.getThreadContentionMonitoringEnabled());

// Test reverts back to default when file is deleted
Files.delete(threadContentionMonitoringEnabledConfFile);
mp.readThreadContentionMonitoringEnabledFromConfShim();
assertTrue(
mp.getThreadContentionMonitoringEnabled()
== ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled);
}

@Test
public void testGetBatchMetrics() throws Exception {
deleteAll();
Expand Down

0 comments on commit 8f914a4

Please sign in to comment.