diff --git a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/spring/fallback/CommandFallbackTest.java b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/spring/fallback/CommandFallbackTest.java index e57ebbf43..c7fb2f442 100644 --- a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/spring/fallback/CommandFallbackTest.java +++ b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/spring/fallback/CommandFallbackTest.java @@ -101,56 +101,56 @@ public void testGetUserSyncWithFallback() { */ -// @Test -// public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException { -// HystrixRequestContext context = HystrixRequestContext.initializeContext(); -// try { -// Future f1 = userService.getUserAsyncFallbackCommand(" ", "name: "); -// -// assertEquals("def", f1.get().getName()); -// -// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); -// HystrixInvokableInfo getUserAsyncFallbackCommand = getHystrixCommandByKey( -// "getUserAsyncFallbackCommand"); -// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand"); -// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand"); -// -// assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name()); -// // confirm that command has failed -// assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); -// // confirm that first fallback has failed -// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); -// // and that second fallback was successful -// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); -// } finally { -// context.shutdown(); -// } -// } -// -// @Test -// public void testGetUserSyncWithFallbackCommand() { -// HystrixRequestContext context = HystrixRequestContext.initializeContext(); -// try { -// User u1 = userService.getUserSyncFallbackCommand(" ", "name: "); -// -// assertEquals("def", u1.getName()); -// assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); -// HystrixInvokableInfo getUserSyncFallbackCommand = getHystrixCommandByKey( -// "getUserSyncFallbackCommand"); -// com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand"); -// com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand"); -// -// assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name()); -// // confirm that command has failed -// assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); -// // confirm that first fallback has failed -// assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); -// // and that second fallback was successful -// assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); -// } finally { -// context.shutdown(); -// } -// } + @Test + public void testGetUserAsyncWithFallbackCommand() throws ExecutionException, InterruptedException { + HystrixRequestContext context = HystrixRequestContext.initializeContext(); + try { + Future f1 = userService.getUserAsyncFallbackCommand(" ", "name: "); + + assertEquals("def", f1.get().getName()); + + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixInvokableInfo getUserAsyncFallbackCommand = getHystrixCommandByKey( + "getUserAsyncFallbackCommand"); + com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand"); + com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand"); + + assertEquals("getUserAsyncFallbackCommand", getUserAsyncFallbackCommand.getCommandKey().name()); + // confirm that command has failed + assertTrue(getUserAsyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + // confirm that first fallback has failed + assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + // and that second fallback was successful + assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } finally { + context.shutdown(); + } + } + + @Test + public void testGetUserSyncWithFallbackCommand() { + HystrixRequestContext context = HystrixRequestContext.initializeContext(); + try { + User u1 = userService.getUserSyncFallbackCommand(" ", "name: "); + + assertEquals("def", u1.getName()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixInvokableInfo getUserSyncFallbackCommand = getHystrixCommandByKey( + "getUserSyncFallbackCommand"); + com.netflix.hystrix.HystrixInvokableInfo firstFallbackCommand = getHystrixCommandByKey("firstFallbackCommand"); + com.netflix.hystrix.HystrixInvokableInfo secondFallbackCommand = getHystrixCommandByKey("secondFallbackCommand"); + + assertEquals("getUserSyncFallbackCommand", getUserSyncFallbackCommand.getCommandKey().name()); + // confirm that command has failed + assertTrue(getUserSyncFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + // confirm that first fallback has failed + assertTrue(firstFallbackCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + // and that second fallback was successful + assertTrue(secondFallbackCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } finally { + context.shutdown(); + } + } public static class UserService { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index 8de109fa2..a1c63c0f2 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -40,6 +40,7 @@ import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; +import rx.functions.Func0; import rx.functions.Func1; import rx.subjects.ReplaySubject; import rx.subscriptions.CompositeSubscription; @@ -524,7 +525,13 @@ public void call(Subscriber s) { getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here } } - }).subscribeOn(threadPool.getScheduler(properties.executionIsolationThreadInterruptOnTimeout().get())); + }).subscribeOn(threadPool.getScheduler(new Func0() { + + @Override + public Boolean call() { + return properties.executionIsolationThreadInterruptOnTimeout().get() && _self.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT); + } + })); } else { // semaphore isolated executionHook.onRunStart(_self); @@ -814,7 +821,7 @@ public Observable call(Throwable t) { return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe)); } else { - logger.debug("HystrixCommand execution " + failureType.name() + " and fallback retrieval failed.", fe); + logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); metrics.markFallbackFailure(); // record the executionResult executionResult = executionResult.addEvents(HystrixEventType.FALLBACK_FAILURE); @@ -826,7 +833,7 @@ public Observable call(Throwable t) { logger.warn("Error calling ExecutionHook.onError", hookException); } - return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and failed retrieving fallback.", e, fe)); + return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe)); } } @@ -938,7 +945,6 @@ public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { - // do fallback logic // report timeout failure originalCommand.metrics.markTimeout(System.currentTimeMillis() - originalCommand.invocationStartTime); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java index 4059ec2eb..c02fa1051 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java @@ -27,6 +27,7 @@ import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; +import rx.functions.Func0; /** * ThreadPool used to executed {@link HystrixCommand#run()} on separate threads when configured to do so with {@link HystrixCommandProperties#executionIsolationStrategy()}. @@ -52,7 +53,7 @@ public interface HystrixThreadPool { public Scheduler getScheduler(); - public Scheduler getScheduler(boolean shouldInterruptThread); + public Scheduler getScheduler(Func0 shouldInterruptThread); /** * Mark when a thread begins executing a command. @@ -155,8 +156,6 @@ public interface HystrixThreadPool { private final BlockingQueue queue; private final ThreadPoolExecutor threadPool; private final HystrixThreadPoolMetrics metrics; - private final Scheduler nonInterruptingScheduler; - private final Scheduler interruptingScheduler; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); @@ -167,8 +166,6 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), properties); this.threadPool = metrics.getThreadPool(); - this.nonInterruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, false); - this.interruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, true); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); @@ -183,17 +180,18 @@ public ThreadPoolExecutor getExecutor() { @Override public Scheduler getScheduler() { //by default, interrupt underlying threads on timeout - return getScheduler(true); + return getScheduler(new Func0() { + @Override + public Boolean call() { + return true; + } + }); } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { touchConfig(); - if (shouldInterruptThread) { - return interruptingScheduler; - } else { - return nonInterruptingScheduler; - } + return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // allow us to change things via fast-properties by setting it each time diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java index 7bb1d9043..5c870c63a 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java @@ -19,6 +19,7 @@ import rx.*; import rx.functions.Action0; +import rx.functions.Func0; import rx.internal.schedulers.ScheduledAction; import rx.subscriptions.*; @@ -48,11 +49,15 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S } public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) { - this(concurrencyStrategy, threadPool, true); + this(concurrencyStrategy, threadPool, new Func0() { + @Override + public Boolean call() { + return true; + } + }); } - - public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, boolean shouldInterruptThread) { + public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0 shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); @@ -106,9 +111,9 @@ public Subscription schedule(Action0 action) { private static class ThreadPoolScheduler extends Scheduler { private final HystrixThreadPool threadPool; - private final boolean shouldInterruptThread; + private final Func0 shouldInterruptThread; - public ThreadPoolScheduler(HystrixThreadPool threadPool, boolean shouldInterruptThread) { + public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0 shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @@ -133,9 +138,9 @@ private static class ThreadPoolWorker extends Worker { private final HystrixThreadPool threadPool; private final CompositeSubscription subscription = new CompositeSubscription(); - private final boolean shouldInterruptThread; + private final Func0 shouldInterruptThread; - public ThreadPoolWorker(HystrixThreadPool threadPool, boolean shouldInterruptThread) { + public ThreadPoolWorker(HystrixThreadPool threadPool, Func0 shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @@ -167,7 +172,6 @@ public Subscription schedule(final Action0 action) { sa.addParent(subscription); Future f = threadPool.getExecutor().submit(sa); - sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread)); return sa; @@ -185,17 +189,22 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { */ private static class FutureCompleterWithConfigurableInterrupt implements Subscription { private final Future f; - private final boolean shouldInterruptThread; + private final Func0 shouldInterruptThread; - private FutureCompleterWithConfigurableInterrupt(Future f, boolean shouldInterruptThread) { + private FutureCompleterWithConfigurableInterrupt(Future f, Func0 shouldInterruptThread) { this.f = f; this.shouldInterruptThread = shouldInterruptThread; } @Override public void unsubscribe() { - f.cancel(shouldInterruptThread); + if (shouldInterruptThread.call()) { + f.cancel(true); + } else { + f.cancel(false); + } } + @Override public boolean isUnsubscribed() { return f.isCancelled(); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index a39838460..c8e56a545 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -5387,6 +5387,67 @@ public void testDoNotInterruptToObservableOnTimeoutIfPropertySaysNotTo() throws assertFalse(cmd.hasBeenInterrupted()); } + @Test + public void testChainedCommand() { + class SubCommand extends TestHystrixCommand { + + public SubCommand(TestCircuitBreaker circuitBreaker) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)); + } + + @Override + protected Integer run() throws Exception { + return 2; + } + } + + class PrimaryCommand extends TestHystrixCommand { + public PrimaryCommand(TestCircuitBreaker circuitBreaker) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)); + } + + @Override + protected Integer run() throws Exception { + throw new RuntimeException("primary failure"); + } + + @Override + protected Integer getFallback() { + SubCommand subCmd = new SubCommand(new TestCircuitBreaker()); + return subCmd.execute(); + } + } + + assertTrue(2 == new PrimaryCommand(new TestCircuitBreaker()).execute()); + } + + @Test + public void testSlowFallback() { + class PrimaryCommand extends TestHystrixCommand { + public PrimaryCommand(TestCircuitBreaker circuitBreaker) { + super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)); + } + + @Override + protected Integer run() throws Exception { + throw new RuntimeException("primary failure"); + } + + @Override + protected Integer getFallback() { + try { + Thread.sleep(1500); + return 1; + } catch (InterruptedException ie) { + System.out.println("Caught Interrupted Exception"); + ie.printStackTrace(); + } + return -1; + } + } + + assertTrue(1 == new PrimaryCommand(new TestCircuitBreaker()).execute()); + } /* ******************************************************************************** */ /* ******************************************************************************** */ @@ -5861,7 +5922,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -5906,7 +5967,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -6287,7 +6348,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -6406,7 +6467,7 @@ public InterruptibleCommand(TestCircuitBreaker circuitBreaker, boolean shouldInt .setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics) .setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter() .withExecutionIsolationThreadInterruptOnTimeout(shouldInterrupt) - .withExecutionIsolationThreadTimeoutInMilliseconds(100))); + .withExecutionTimeoutInMilliseconds(100))); } private volatile boolean hasBeenInterrupted; diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index a9e5f1795..e33b6cefd 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -5582,7 +5582,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -7599,7 +7599,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -7644,7 +7644,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } @@ -8248,7 +8248,7 @@ public Scheduler getScheduler() { } @Override - public Scheduler getScheduler(boolean shouldInterruptThread) { + public Scheduler getScheduler(Func0 shouldInterruptThread) { return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); }