From 643d68089b7ee3ccf6ddca883e4f80362cf16c9d Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Thu, 20 Oct 2016 12:42:52 -0700 Subject: [PATCH] Added flag to force users to opt-in to allowing core and maximum thread pool sizes to diverge --- .../netflix/hystrix/HystrixThreadPool.java | 32 +++++++++---- .../hystrix/HystrixThreadPoolProperties.java | 48 +++++++++++++++---- .../HystrixConcurrencyStrategy.java | 16 ++++++- .../HystrixThreadPoolPropertiesTest.java | 15 +----- .../hystrix/HystrixThreadPoolTest.java | 1 + 5 files changed, 78 insertions(+), 34 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java index 1e0d399da..e812b46ee 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java @@ -173,11 +173,18 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.queue = concurrencyStrategy.getBlockingQueue(queueSize); - this.metrics = HystrixThreadPoolMetrics.getInstance( - threadPoolKey, - concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), - properties); - this.threadPool = metrics.getThreadPool(); + + if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) { + this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), + properties); + this.threadPool = this.metrics.getThreadPool(); + } else { + this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), + properties); + this.threadPool = this.metrics.getThreadPool(); + } /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); @@ -210,16 +217,21 @@ public Scheduler getScheduler(Func0 shouldInterruptThread) { private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); int dynamicMaximumSize = properties.maximumSize().get(); + final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize(); + boolean maxTooLow = false; - if (dynamicMaximumSize < dynamicCoreSize) { - logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is using coreSize = " + - dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + - dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + if (allowSizesToDiverge && dynamicMaximumSize < dynamicCoreSize) { dynamicMaximumSize = dynamicCoreSize; + maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. - if (threadPool.getCorePoolSize() != dynamicCoreSize || threadPool.getMaximumPoolSize() != dynamicMaximumSize) { + if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { + if (maxTooLow) { + logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java index 3a054033b..369564f93 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java @@ -15,6 +15,7 @@ */ package com.netflix.hystrix; +import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forBoolean; import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forInteger; import java.util.concurrent.BlockingQueue; @@ -44,13 +45,15 @@ */ public abstract class HystrixThreadPoolProperties { - - /* defaults */ - static int default_coreSize = 10; // core size of thread pool + static int default_coreSize = 10; // core size of thread pool + static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive - static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) - // -1 turns if off and makes us use SynchronousQueue + static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) + // -1 turns if off and makes us use SynchronousQueue + static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool + //turning this on should be a conscious decision by the user, so we default it to false + static int default_queueSizeRejectionThreshold = 5; // number of items in queue static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets) @@ -60,6 +63,8 @@ public abstract class HystrixThreadPoolProperties { private final HystrixProperty keepAliveTime; private final HystrixProperty maxQueueSize; private final HystrixProperty queueSizeRejectionThreshold; + private final boolean allowMaximumSizeToDivergeFromCoreSize; + private final HystrixProperty threadPoolRollingNumberStatisticalWindowInMilliseconds; private final HystrixProperty threadPoolRollingNumberStatisticalWindowBuckets; @@ -72,12 +77,13 @@ protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder) } protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) { - //we allow maximum pool size to be configured lower than core size here - //however, at runtime, if this configuration gets applied, we will always ensure that maximumSize >= coreSize - this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize); + this.allowMaximumSizeToDivergeFromCoreSize = getValueOnce(propertyPrefix, key, "allowMaximumSizeToDivergeFromCoreSize", + builder.getAllowMaximumSizeToDivergeFromCoreSize(), default_allow_maximum_size_to_diverge_from_core_size); - //if left unset, maxiumumSize will default to coreSize - this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), corePoolSize.get()); + this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize); + //this object always contains a reference to the configuration value for the maximumSize of the threadpool + //it only gets applied if .threadpool + this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), default_maximumSize); this.keepAliveTime = getProperty(propertyPrefix, key, "keepAliveTimeMinutes", builder.getKeepAliveTimeMinutes(), default_keepAliveTimeMinutes); this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize); @@ -93,6 +99,14 @@ private static HystrixProperty getProperty(String propertyPrefix, Hystr .build(); } + private static boolean getValueOnce(String propertyPrefix, HystrixThreadPoolKey key, String instanceProperty, boolean builderOverrideValue, boolean defaultValue) { + return forBoolean() + .add(propertyPrefix + ".threadpool." + key.name() + "." + instanceProperty, builderOverrideValue) + .add(propertyPrefix + ".threadpool.default." + instanceProperty, defaultValue) + .build() + .get(); + } + /** * Core thread-pool size that gets passed to {@link ThreadPoolExecutor#setCorePoolSize(int)} * @@ -144,6 +158,10 @@ public HystrixProperty queueSizeRejectionThreshold() { return queueSizeRejectionThreshold; } + public boolean getAllowMaximumSizeToDivergeFromCoreSize() { + return allowMaximumSizeToDivergeFromCoreSize; + } + /** * Duration of statistical rolling window in milliseconds. This is passed into {@link HystrixRollingNumber} inside each {@link HystrixThreadPoolMetrics} instance. * @@ -200,6 +218,7 @@ public static class Setter { private Integer keepAliveTimeMinutes = null; private Integer maxQueueSize = null; private Integer queueSizeRejectionThreshold = null; + private boolean allowMaximumSizeToDivergeFromCoreSize = false; private Integer rollingStatisticalWindowInMilliseconds = null; private Integer rollingStatisticalWindowBuckets = null; @@ -226,6 +245,10 @@ public Integer getQueueSizeRejectionThreshold() { return queueSizeRejectionThreshold; } + public boolean getAllowMaximumSizeToDivergeFromCoreSize() { + return allowMaximumSizeToDivergeFromCoreSize; + } + public Integer getMetricsRollingStatisticalWindowInMilliseconds() { return rollingStatisticalWindowInMilliseconds; } @@ -259,6 +282,11 @@ public Setter withQueueSizeRejectionThreshold(int value) { return this; } + public Setter withAllowMaximumSizeToDivergeFromCoreSize(boolean value) { + this.allowMaximumSizeToDivergeFromCoreSize = value; + return this; + } + public Setter withMetricsRollingStatisticalWindowInMilliseconds(int value) { this.rollingStatisticalWindowInMilliseconds = value; return this; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java index 257559b40..3405cec68 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java @@ -21,6 +21,8 @@ import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.properties.HystrixProperty; import com.netflix.hystrix.util.PlatformSpecific; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -46,6 +48,8 @@ */ public abstract class HystrixConcurrencyStrategy { + private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class); + /** * Factory method to provide {@link ThreadPoolExecutor} instances as desired. *

@@ -88,7 +92,17 @@ public Thread newThread(Runnable r) { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } - return new ThreadPoolExecutor(corePoolSize.get(), maximumPoolSize.get(), keepAliveTime.get(), unit, workQueue, threadFactory); + final int dynamicCoreSize = corePoolSize.get(); + final int dynamicMaximumSize = maximumPoolSize.get(); + + if (dynamicCoreSize > dynamicMaximumSize) { + logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } else { + return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } } /** diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java index 5fc6dc63a..912caf4b2 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java @@ -104,7 +104,7 @@ public void testSetNeitherCoreNorMaximumSize() { }; assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue()); - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue()); + assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue()); } @Test @@ -115,7 +115,7 @@ public void testSetCoreSizeOnly() { }; assertEquals(14, properties.coreSize().get().intValue()); - assertEquals(14, properties.maximumSize().get().intValue()); + assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue()); } @Test @@ -129,17 +129,6 @@ public void testSetMaximumSizeOnlyLowerThanDefaultCoreSize() { assertEquals(3, properties.maximumSize().get().intValue()); } - @Test - public void testSetMaximumSizeOnlyEqualToDefaultCoreSize() { - HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST, - HystrixThreadPoolProperties.Setter().withMaximumSize(HystrixThreadPoolProperties.default_coreSize)) { - - }; - - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue()); - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue()); - } - @Test public void testSetMaximumSizeOnlyGreaterThanDefaultCoreSize() { HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST, diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java index c8cf60455..faf22f9be 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java @@ -122,6 +122,7 @@ public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(Hystri //Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which //wins to be inserted into the HystrixThreadPool.Factory.threadPools cache. } + @Test(timeout = 2500) public void testUnsubscribeHystrixThreadPool() throws InterruptedException { // methods are package-private so can't test it somewhere else