Skip to content

Commit

Permalink
Merge pull request #687 from mattrjacobs/fallback-rejection
Browse files Browse the repository at this point in the history
Added unit test for fallback rejection
  • Loading branch information
mattrjacobs committed Feb 19, 2015
2 parents af63e13 + f6c57d1 commit d60c7d0
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 85 deletions.
78 changes: 36 additions & 42 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -675,51 +675,12 @@ public void call() {
* <p>
* If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially
* all threads to pile up and block.
*
*
* @return K
* @throws UnsupportedOperationException
* if getFallback() not implemented
* @throws HystrixException
* if getFallback() fails (throws an Exception) or is rejected by the semaphore
*/
private Observable<R> getFallbackWithProtection() {
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
executionHook.onFallbackStart(this);
final AbstractCommand<R> _cmd = this;

Observable<R> fallback;
try {
fallback = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallback = Observable.error(t);
}

return fallback
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnTerminate(new Action0() {

@Override
public void call() {
fallbackSemaphore.release();
}

});
} else {
metrics.markFallbackRejection();

logger.debug("HystrixCommand Fallback Rejection."); // debug only since we're throwing the exception and someone higher will do something with it
// if we couldn't acquire a permit, we "fail fast" by throwing an exception
return Observable.error(new HystrixRuntimeException(FailureType.REJECTED_SEMAPHORE_FALLBACK, this.getClass(), getLogMessagePrefix() + " fallback execution rejected.", null, null));
}
}

/**
* @throws HystrixRuntimeException
* if getFallback() fails (throws an Exception) or is rejected by the semaphore
*/
private Observable<R> getFallbackOrThrowException(final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
Expand All @@ -731,7 +692,40 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
executionResult = executionResult.addEvents(eventType);
final AbstractCommand<R> _cmd = this;

return getFallbackWithProtection().doOnNext(new Action1<R>() {
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();

Observable<R> fallbackExecutionChain;

// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
executionHook.onFallbackStart(this);

try {
fallbackExecutionChain = getFallbackObservable();
} catch (Throwable t) {
// getFallback() is user provided and can throw so we catch it and turn it into Observable.error
fallbackExecutionChain = Observable.error(t);
}

fallbackExecutionChain = fallbackExecutionChain
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnTerminate(new Action0() {

@Override
public void call() {
fallbackSemaphore.release();
}
});
} else {
metrics.markFallbackRejection();

logger.debug("HystrixCommand Fallback Rejection."); // debug only since we're throwing the exception and someone higher will do something with it
// if we couldn't acquire a permit, we "fail fast" by throwing an exception
return Observable.error(new HystrixRuntimeException(FailureType.REJECTED_SEMAPHORE_FALLBACK, this.getClass(), getLogMessagePrefix() + " fallback execution rejected.", null, null));
}

return fallbackExecutionChain.doOnNext(new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,13 +1009,13 @@ public void call(C command) {
*/
@Test
public void testExecutionHookResponseFromCache() {
getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10).observe();
getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10, 10).observe();

assertHooksOnSuccess(
new Func0<C>() {
@Override
public C call() {
return getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10);
return getCommand(ExecutionIsolationStrategy.THREAD, ExecutionResult.SUCCESS, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.YES, 42, 10, 10);
}
},
new Action1<C>() {
Expand Down Expand Up @@ -1452,50 +1452,61 @@ C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult execu
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", 10, 10);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int semaphoreCount) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, cacheEnabled, value, semaphoreCount, false);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, int fallbackSemaphoreCount) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphoreCount, fallbackSemaphoreCount, false);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int semaphoreCount, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual semaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(semaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, cacheEnabled, value, semaphore, circuitBreakerDisabled);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, int fallbackSemaphoreCount, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual executionSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(executionSemaphoreCount));
AbstractCommand.TryableSemaphoreActual fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(fallbackSemaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphore, fallbackSemaphore, circuitBreakerDisabled);
}

abstract C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, AbstractCommand.TryableSemaphore semaphore, boolean circuitBreakerDisabled);
C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, int executionSemaphoreCount, AbstractCommand.TryableSemaphore fallbackSemaphore, boolean circuitBreakerDisabled) {
AbstractCommand.TryableSemaphoreActual executionSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(executionSemaphoreCount));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, fallbackLatency, circuitBreaker, threadPool, timeout, cacheEnabled, value, executionSemaphore, fallbackSemaphore, circuitBreakerDisabled);
}

abstract C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout, CacheEnabled cacheEnabled, Object value, AbstractCommand.TryableSemaphore executionSemaphore, AbstractCommand.TryableSemaphore fallbackSemaphore, boolean circuitBreakerDisabled);

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, int timeout) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, circuitBreaker, threadPool, timeout, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, circuitBreaker, threadPool, timeout, CacheEnabled.NO, "foo", 10, 10);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, AbstractCommand.TryableSemaphore executionSemaphore) {
AbstractCommand.TryableSemaphoreActual fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(10));
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", executionSemaphore, fallbackSemaphore, false);
}

C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency, FallbackResult fallbackResult, AbstractCommand.TryableSemaphore semaphore) {
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", semaphore, false);
C getFallbackLatentCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, AbstractCommand.TryableSemaphore fallbackSemaphore) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, fallbackLatency, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, fallbackSemaphore, false);
}

C getCircuitOpenCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult) {
HystrixCircuitBreakerTest.TestCircuitBreaker openCircuit = new HystrixCircuitBreakerTest.TestCircuitBreaker().setForceShortCircuit(true);
return getCommand(isolationStrategy, ExecutionResult.SUCCESS, 0, fallbackResult, openCircuit, null, 100, CacheEnabled.NO, "foo", 10, false);
return getCommand(isolationStrategy, ExecutionResult.SUCCESS, 0, fallbackResult, 0, openCircuit, null, 100, CacheEnabled.NO, "foo", 10, 10, false);
}

C getSharedCircuitBreakerCommand(ExecutionIsolationStrategy isolationStrategy, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, FallbackResult.SUCCESS, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, FallbackResult.SUCCESS, 0, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, 10);
}

C getSharedCircuitBreakerCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10);
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, 0, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, 10);
}

C getCircuitBreakerDisabledCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult) {
return getCommand(isolationStrategy, executionResult, 0, FallbackResult.UNIMPLEMENTED, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.NO, "foo", 10, true);
return getCommand(isolationStrategy, executionResult, 0, FallbackResult.UNIMPLEMENTED, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, 100, CacheEnabled.NO, "foo", 10, 10, true);
}
}
Loading

0 comments on commit d60c7d0

Please sign in to comment.