From 8f914a420bcb5b180b65a9c0bb51cea8efea34c5 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 22 Mar 2022 20:05:27 +0530 Subject: [PATCH] Enables/disables thread contention monitoring based on setting Signed-off-by: Surya Sashank Nistala --- null/data/batch_metrics_enabled.conf | 1 + .../thread_contention_monitoring_enabled.conf | 1 + .../PerformanceAnalyzerApp.java | 3 + .../performanceanalyzer/jvm/ThreadList.java | 14 +- .../rca/framework/metrics/ReaderMetrics.java | 4 + ...eadContentionMonitoringEnabledSampler.java | 57 +++++++ .../reader/ReaderMetricsProcessor.java | 38 +++++ ...ontentionMonitoringEnabledSamplerTest.java | 139 ++++++++++++++++++ .../reader/ReaderMetricsProcessorTests.java | 40 +++++ 9 files changed, 291 insertions(+), 6 deletions(-) create mode 100644 null/data/batch_metrics_enabled.conf create mode 100644 null/data/thread_contention_monitoring_enabled.conf create mode 100644 src/main/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSampler.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSamplerTest.java diff --git a/null/data/batch_metrics_enabled.conf b/null/data/batch_metrics_enabled.conf new file mode 100644 index 000000000..f32a5804e --- /dev/null +++ b/null/data/batch_metrics_enabled.conf @@ -0,0 +1 @@ +true \ No newline at end of file diff --git a/null/data/thread_contention_monitoring_enabled.conf b/null/data/thread_contention_monitoring_enabled.conf new file mode 100644 index 000000000..f32a5804e --- /dev/null +++ b/null/data/thread_contention_monitoring_enabled.conf @@ -0,0 +1 @@ +true \ No newline at end of file diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java index 6bfde79ca..b422e8147 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerApp.java @@ -67,6 +67,7 @@ import org.opensearch.performanceanalyzer.rca.samplers.BatchMetricsEnabledSampler; import org.opensearch.performanceanalyzer.rca.samplers.MetricsDBFileSampler; import org.opensearch.performanceanalyzer.rca.samplers.RcaStateSamplers; +import org.opensearch.performanceanalyzer.rca.samplers.ThreadContentionMonitoringEnabledSampler; import org.opensearch.performanceanalyzer.rca.stats.RcaStatsReporter; import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator; import org.opensearch.performanceanalyzer.rca.stats.emitters.ISampler; @@ -364,6 +365,7 @@ public static List getAllSamplers(final AppContext appContext) { allSamplers.add(RcaStateSamplers.getRcaEnabledSampler(appContext)); allSamplers.add(new BatchMetricsEnabledSampler(appContext)); allSamplers.add(new MetricsDBFileSampler(appContext)); + allSamplers.add(new ThreadContentionMonitoringEnabledSampler(appContext)); return allSamplers; } @@ -373,6 +375,7 @@ private static MeasurementSet[] getPeriodicMeasurementSets() { measurementSets.addAll(Arrays.asList(JvmMetrics.values())); measurementSets.add(RcaRuntimeMetrics.RCA_ENABLED); measurementSets.add(ReaderMetrics.BATCH_METRICS_ENABLED); + measurementSets.add(ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED); measurementSets.add(ReaderMetrics.METRICSDB_NUM_FILES); measurementSets.add(ReaderMetrics.METRICSDB_SIZE_FILES); measurementSets.add(ReaderMetrics.METRICSDB_NUM_UNCOMPRESSED_FILES); diff --git a/src/main/java/org/opensearch/performanceanalyzer/jvm/ThreadList.java b/src/main/java/org/opensearch/performanceanalyzer/jvm/ThreadList.java index c89dac9f7..6743d8027 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/jvm/ThreadList.java +++ b/src/main/java/org/opensearch/performanceanalyzer/jvm/ThreadList.java @@ -51,6 +51,7 @@ import org.opensearch.performanceanalyzer.core.Util; import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor; import sun.tools.attach.HotSpotVirtualMachine; /** Traverses and prints the stack traces for all Java threads in the remote VM */ @@ -73,12 +74,6 @@ public class ThreadList { private static Lock vmAttachLock = new ReentrantLock(); - static { - if (threadBean.isThreadContentionMonitoringSupported()) { - threadBean.setThreadContentionMonitoringEnabled(true); - } - } - public static class ThreadState { public long javaTid; public long nativeTid; @@ -146,6 +141,13 @@ public String toString() { * @return A hashmap of threadId to threadState. */ public static Map getNativeTidMap() { + if (threadBean.isThreadContentionMonitoringSupported()) { + if (ReaderMetricsProcessor.getInstance().getThreadContentionMonitoringEnabled()) { + threadBean.setThreadContentionMonitoringEnabled(true); + } else { + threadBean.setThreadContentionMonitoringEnabled(false); + } + } if (vmAttachLock.tryLock()) { try { // Thread dumps are expensive and therefore we make sure that at least diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java index 92e3938b2..1fcb2ad67 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java @@ -124,6 +124,10 @@ public enum ReaderMetrics implements MeasurementSet { "millis", Arrays.asList(Statistics.MAX, Statistics.MEAN, Statistics.SUM)), + /** Whether or not thread contention monitoring is enabled (0 for enabled, 1 for disabled). */ + THREAD_CONTENTION_MONITORING_ENABLED( + "ThreadContentionMonitoringEnabled", "count", Statistics.SAMPLE), + /** Amount of time taken to emit Shard State metrics. */ SHARD_STATE_EMITTER_EXECUTION_TIME( "ShardStateEmitterExecutionTime", diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSampler.java b/src/main/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSampler.java new file mode 100644 index 000000000..5d42f2845 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSampler.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.performanceanalyzer.rca.samplers; + + +import java.util.Objects; +import org.opensearch.performanceanalyzer.AppContext; +import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics; +import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails; +import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator; +import org.opensearch.performanceanalyzer.rca.stats.emitters.ISampler; +import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor; + +public class ThreadContentionMonitoringEnabledSampler implements ISampler { + private final AppContext appContext; + + public ThreadContentionMonitoringEnabledSampler(final AppContext appContext) { + Objects.requireNonNull(appContext); + this.appContext = appContext; + } + + @Override + public void sample(SampleAggregator sampleCollector) { + sampleCollector.updateStat( + ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED, + "", + isThreadContentionMonitoringEnabled() ? 1 : 0); + } + + boolean isThreadContentionMonitoringEnabled() { + return ReaderMetricsProcessor.getInstance().getThreadContentionMonitoringEnabled(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index dab42c36a..1c22797d2 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -103,8 +103,12 @@ public class ReaderMetricsProcessor implements Runnable { private final ConfigOverridesApplier configOverridesApplier; public static final String BATCH_METRICS_ENABLED_CONF_FILE = "batch_metrics_enabled.conf"; + public static final String THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE = + "thread_contention_monitoring_enabled.conf"; private boolean batchMetricsEnabled; public static final boolean defaultBatchMetricsEnabled = false; + private boolean threadContentionMonitoringEnabled; + public static final boolean defaultThreadContentionMonitoringEnabled = false; // This needs to be concurrent since it may be concurrently accessed by the metrics processor // thread and the query handler thread. private ConcurrentSkipListSet batchMetricsDBSet; @@ -156,6 +160,7 @@ public ReaderMetricsProcessor( batchMetricsDBSet = new ConcurrentSkipListSet<>(); readBatchMetricsEnabledFromConf(); restoreBatchMetricsState(); + threadContentionMonitoringEnabled = defaultBatchMetricsEnabled; } @Override @@ -1011,6 +1016,34 @@ public boolean getBatchMetricsEnabled() { return batchMetricsEnabled; } + private void readThreadContentionMonitoringEnabledFromConf() { + Path filePath = Paths.get(Util.DATA_DIR, THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE); + + Util.invokePrivileged( + () -> { + try (Scanner sc = new Scanner(filePath)) { + String nextLine = sc.nextLine(); + boolean oldValue = threadContentionMonitoringEnabled; + boolean newValue = Boolean.parseBoolean(nextLine); + if (oldValue != newValue) { + threadContentionMonitoringEnabled = newValue; + LOG.info( + "Thread contention monitoring enabled changed from {} to {}", + oldValue, + newValue); + } + } catch (IOException e) { + LOG.error("Error reading file '{}': {}", filePath.toString(), e); + threadContentionMonitoringEnabled = + defaultThreadContentionMonitoringEnabled; + } + }); + } + + public boolean getThreadContentionMonitoringEnabled() { + return threadContentionMonitoringEnabled; + } + /** * An example value is this: current_time:1566413987194 StartTime:1566413987194 ItemCount:359 * IndexName:nyc_taxis ShardID:25 Primary:true Each pair is separated by new line and the key @@ -1089,4 +1122,9 @@ NavigableMap getMetricsDBMap() { public void readBatchMetricsEnabledFromConfShim() { readBatchMetricsEnabledFromConf(); } + + @VisibleForTesting + public void readThreadContentionMonitoringEnabledFromConfShim() { + readThreadContentionMonitoringEnabledFromConf(); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSamplerTest.java b/src/test/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSamplerTest.java new file mode 100644 index 000000000..c89b80c0e --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/rca/samplers/ThreadContentionMonitoringEnabledSamplerTest.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.performanceanalyzer.rca.samplers; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.performanceanalyzer.AppContext; +import org.opensearch.performanceanalyzer.core.Util; +import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics; +import org.opensearch.performanceanalyzer.rca.stats.collectors.SampleAggregator; +import org.opensearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import org.opensearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; +import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor; + +public class ThreadContentionMonitoringEnabledSamplerTest { + private static Path threadContentionMonitoringEnabledConfFile; + private static String rootLocation; + private static ReaderMetricsProcessor mp; + private static AppContext appContext; + private static ThreadContentionMonitoringEnabledSampler uut; + + @Mock private SampleAggregator sampleAggregator; + + @BeforeClass + public static void setUpClass() throws Exception { + Files.createDirectories(Paths.get(Util.DATA_DIR)); + threadContentionMonitoringEnabledConfFile = + Paths.get( + Util.DATA_DIR, + ReaderMetricsProcessor.THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE); + Files.deleteIfExists(threadContentionMonitoringEnabledConfFile); + + rootLocation = "build/resources/test/reader/"; + mp = new ReaderMetricsProcessor(rootLocation); + ReaderMetricsProcessor.setCurrentInstance(mp); + + appContext = new AppContext(); + uut = new ThreadContentionMonitoringEnabledSampler(appContext); + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + private void writeThreadContentionMonitoringEnabled(boolean enabled) throws IOException { + Files.write( + threadContentionMonitoringEnabledConfFile, Boolean.toString(enabled).getBytes()); + } + + private void clearThreadContentionMonitoringEnabled() throws IOException { + Files.deleteIfExists(threadContentionMonitoringEnabledConfFile); + } + + @Test + public void testIsThreadContentionMonitoringEnabled_notMaster() { + appContext.setClusterDetailsEventProcessor(null); + assertFalse(uut.isThreadContentionMonitoringEnabled()); + } + + @Test + public void testIsThreadContentionMonitoringEnabled() throws IOException { + ClusterDetailsEventProcessor clusterDetailsEventProcessor = + new ClusterDetailsEventProcessor(); + ClusterDetailsEventProcessor.NodeDetails details = + ClusterDetailsEventProcessorTestHelper.newNodeDetails("nodex", "127.0.0.1", true); + clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details)); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + + // No thread contention monitoring enabled file + clearThreadContentionMonitoringEnabled(); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertFalse(uut.isThreadContentionMonitoringEnabled()); + + // thread contention monitoring disabled + writeThreadContentionMonitoringEnabled(false); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertFalse(uut.isThreadContentionMonitoringEnabled()); + + // thread contention monitoring disabled + writeThreadContentionMonitoringEnabled(true); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertTrue(uut.isThreadContentionMonitoringEnabled()); + } + + @Test + public void testSample() { + ClusterDetailsEventProcessor clusterDetailsEventProcessor = + new ClusterDetailsEventProcessor(); + ClusterDetailsEventProcessor.NodeDetails details = + ClusterDetailsEventProcessorTestHelper.newNodeDetails("nodex", "127.0.0.1", true); + clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details)); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + + uut.sample(sampleAggregator); + verify(sampleAggregator, times(1)) + .updateStat( + ReaderMetrics.THREAD_CONTENTION_MONITORING_ENABLED, + "", + mp.getThreadContentionMonitoringEnabled() ? 1 : 0); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessorTests.java b/src/test/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessorTests.java index e18f18fdb..045566da3 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessorTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessorTests.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor.BATCH_METRICS_ENABLED_CONF_FILE; +import static org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor.THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE; import java.io.File; import java.io.FilenameFilter; @@ -409,6 +410,45 @@ public void testReadBatchMetricsEnabledFromConf() throws Exception { mp.getBatchMetricsEnabled() == ReaderMetricsProcessor.defaultBatchMetricsEnabled); } + @Test + public void testReadThreadContentionMonitoringEnabledFromConf() throws Exception { + Files.createDirectories(Paths.get(Util.DATA_DIR)); + Path threadContentionMonitoringEnabledConfFile = + Paths.get(Util.DATA_DIR, THREAD_CONTENTION_MONITORING_ENABLED_CONF_FILE); + Files.deleteIfExists(threadContentionMonitoringEnabledConfFile); + ReaderMetricsProcessor mp = new ReaderMetricsProcessor(rootLocation); + ReaderMetricsProcessor.setCurrentInstance(mp); + + // Test default + assertTrue( + mp.getThreadContentionMonitoringEnabled() + == ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled); + + // Test disabled + Files.write(threadContentionMonitoringEnabledConfFile, Boolean.toString(false).getBytes()); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertFalse(mp.getThreadContentionMonitoringEnabled()); + + // Test reverts back to default when file is deleted + Files.delete(threadContentionMonitoringEnabledConfFile); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertTrue( + mp.getThreadContentionMonitoringEnabled() + == ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled); + + // Test enabled + Files.write(threadContentionMonitoringEnabledConfFile, Boolean.toString(true).getBytes()); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertTrue(mp.getThreadContentionMonitoringEnabled()); + + // Test reverts back to default when file is deleted + Files.delete(threadContentionMonitoringEnabledConfFile); + mp.readThreadContentionMonitoringEnabledFromConfShim(); + assertTrue( + mp.getThreadContentionMonitoringEnabled() + == ReaderMetricsProcessor.defaultThreadContentionMonitoringEnabled); + } + @Test public void testGetBatchMetrics() throws Exception { deleteAll();