Skip to content

Commit

Permalink
Merge pull request #681 from mattrjacobs/add-cache-hit-hook
Browse files Browse the repository at this point in the history
Add execution hook for cache hit
  • Loading branch information
mattrjacobs committed Feb 14, 2015
2 parents deacde2 + 261f695 commit 2070141
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ final public Observable<R> toObservable() {
if (fromCache != null) {
/* mark that we received this response from cache */
metrics.markResponseFromCache();
isExecutionComplete.set(true);
executionHook.onCacheHit(this);
return new CachedObservableResponse<>((CachedObservableOriginal<R>) fromCache, this);
}
}
Expand Down Expand Up @@ -1879,6 +1881,11 @@ public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
actual.onThreadComplete(commandInstance);
}

@Override
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
actual.onCacheHit(commandInstance);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> HystrixCommand<T> getHystrixCommandFromAbstractIfApplicable(HystrixInvokable<T> commandInstance) {
if (commandInstance instanceof HystrixCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,11 @@ public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
// do nothing by default
}

/**
* Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
* @param commandInstance The executing HystrixCommand
*/
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2405,8 +2405,8 @@ public void testDynamicKey() {
@Test
public void testRequestCache1() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -2462,8 +2462,8 @@ public void testRequestCache1() {
@Test
public void testRequestCache2() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "B");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "B");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -2516,9 +2516,9 @@ public void testRequestCache2() {
@Test
public void testRequestCache3() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "B");
SuccessfulCacheableCommand command3 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "B");
SuccessfulCacheableCommand<String> command3 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -2650,9 +2650,9 @@ public void testRequestCacheWithSlowExecution() {
@Test
public void testNoRequestCache3() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, false, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, false, "B");
SuccessfulCacheableCommand command3 = new SuccessfulCacheableCommand(circuitBreaker, false, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "B");
SuccessfulCacheableCommand<String> command3 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -3245,10 +3245,10 @@ public void testCacheKeyExecutionRequiresRequestVariable() {

TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();

SuccessfulCacheableCommand command = new SuccessfulCacheableCommand(circuitBreaker, true, "one");
SuccessfulCacheableCommand command = new SuccessfulCacheableCommand<>(circuitBreaker, true, "one");
assertEquals("one", command.execute());

SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "two");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "two");
assertEquals("two", command2.queue().get());

fail("We expect an exception because cacheKey requires RequestVariable.");
Expand Down Expand Up @@ -4757,6 +4757,44 @@ public void call(TestHystrixCommand<Boolean> command) {
});
}

/**
* Short-circuit? : NO
* Request-cache? : YES
*/
@Test
public void testExecutionHookResponseFromCache() {
final TestCircuitBreaker cb = new TestCircuitBreaker();
HystrixCommand<Boolean> fillCache = new SuccessfulCacheableCommand<>(cb, true, true);
assertEquals(true, fillCache.execute());

assertHooksOnSuccess(
new Func0<TestHystrixCommand<Boolean>>() {
@Override
public TestHystrixCommand<Boolean> call() {
return new SuccessfulCacheableCommand<>(cb, true, true);
}
},
new Action1<TestHystrixCommand<Boolean>>() {
@Override
public void call(TestHystrixCommand<Boolean> command) {
assertEquals(0, command.builder.executionHook.startExecute.get());
assertNull(command.builder.executionHook.endExecuteSuccessResponse);
assertNull(command.builder.executionHook.endExecuteFailureException);
assertNull(command.builder.executionHook.endExecuteFailureType);
assertEquals(0, command.builder.executionHook.startRun.get());
assertNull(command.builder.executionHook.runSuccessResponse);
assertNull(command.builder.executionHook.runFailureException);
assertEquals(0, command.builder.executionHook.startFallback.get());
assertNull(command.builder.executionHook.fallbackSuccessResponse);
assertNull(command.builder.executionHook.fallbackFailureException);
assertEquals(0, command.builder.executionHook.threadStart.get());
assertEquals(0, command.builder.executionHook.threadComplete.get());
assertEquals(1, command.builder.executionHook.cacheHit.get());
assertEquals("onCacheHit - ", command.builder.executionHook.executionSequence.toString());
}
});
}

/**
*********************** END THREAD-ISOLATED Execution Hook Tests **************************************
*/
Expand Down Expand Up @@ -5806,20 +5844,20 @@ protected Boolean getFallback() {
/**
* A Command implementation that supports caching.
*/
private static class SuccessfulCacheableCommand extends TestHystrixCommand<String> {
private static class SuccessfulCacheableCommand<T> extends TestHystrixCommand<T> {

private final boolean cacheEnabled;
private volatile boolean executed = false;
private final String value;
private final T value;

public SuccessfulCacheableCommand(TestCircuitBreaker circuitBreaker, boolean cacheEnabled, String value) {
public SuccessfulCacheableCommand(TestCircuitBreaker circuitBreaker, boolean cacheEnabled, T value) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics));
this.value = value;
this.cacheEnabled = cacheEnabled;
}

@Override
protected String run() {
protected T run() {
executed = true;
System.out.println("successfully executed");
return value;
Expand All @@ -5832,7 +5870,7 @@ public boolean isCommandRunningInThread() {
@Override
public String getCacheKey() {
if (cacheEnabled)
return value;
return value.toString();
else
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1804,8 +1804,8 @@ public void testDynamicKey() {
@Test
public void testRequestCache1UsingThreadIsolation() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -1862,8 +1862,8 @@ public void testRequestCache1UsingThreadIsolation() {
@Test
public void testRequestCache2UsingThreadIsolation() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "B");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "B");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -1917,9 +1917,9 @@ public void testRequestCache2UsingThreadIsolation() {
@Test
public void testRequestCache3UsingThreadIsolation() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "B");
SuccessfulCacheableCommand command3 = new SuccessfulCacheableCommand(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "B");
SuccessfulCacheableCommand<String> command3 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -2060,9 +2060,9 @@ public void testRequestCacheWithSlowExecution() {
@Test
public void testNoRequestCache3UsingThreadIsolation() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SuccessfulCacheableCommand command1 = new SuccessfulCacheableCommand(circuitBreaker, false, "A");
SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, false, "B");
SuccessfulCacheableCommand command3 = new SuccessfulCacheableCommand(circuitBreaker, false, "A");
SuccessfulCacheableCommand<String> command1 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "A");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "B");
SuccessfulCacheableCommand<String> command3 = new SuccessfulCacheableCommand<>(circuitBreaker, false, "A");

assertTrue(command1.isCommandRunningInThread());

Expand Down Expand Up @@ -2695,10 +2695,10 @@ public void testCacheKeyExecutionRequiresRequestVariable() {

TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();

SuccessfulCacheableCommand command = new SuccessfulCacheableCommand(circuitBreaker, true, "one");
SuccessfulCacheableCommand<String> command = new SuccessfulCacheableCommand<>(circuitBreaker, true, "one");
assertEquals("one", command.observe().toBlocking().single());

SuccessfulCacheableCommand command2 = new SuccessfulCacheableCommand(circuitBreaker, true, "two");
SuccessfulCacheableCommand<String> command2 = new SuccessfulCacheableCommand<>(circuitBreaker, true, "two");
assertEquals("two", command2.observe().toBlocking().toFuture().get());

fail("We expect an exception because cacheKey requires RequestVariable.");
Expand Down Expand Up @@ -4289,6 +4289,44 @@ public void call(TestHystrixCommand<Boolean> command) {
});
}

/**
* Short-circuit? : NO
* Request-cache? : YES
*/
@Test
public void testExecutionHookResponseFromCache() {
final TestCircuitBreaker cb = new TestCircuitBreaker();
HystrixObservableCommand<Boolean> fillCache = new SuccessfulCacheableCommand<>(cb, true, true);
fillCache.observe();

assertHooksOnSuccess(
new Func0<TestHystrixCommand<Boolean>>() {
@Override
public TestHystrixCommand<Boolean> call() {
return new SuccessfulCacheableCommand<>(cb, true, true);
}
},
new Action1<TestHystrixCommand<Boolean>>() {
@Override
public void call(TestHystrixCommand<Boolean> command) {
assertEquals(0, command.builder.executionHook.startExecute.get());
assertNull(command.builder.executionHook.endExecuteSuccessResponse);
assertNull(command.builder.executionHook.endExecuteFailureException);
assertNull(command.builder.executionHook.endExecuteFailureType);
assertEquals(0, command.builder.executionHook.startRun.get());
assertNull(command.builder.executionHook.runSuccessResponse);
assertNull(command.builder.executionHook.runFailureException);
assertEquals(0, command.builder.executionHook.startFallback.get());
assertNull(command.builder.executionHook.fallbackSuccessResponse);
assertNull(command.builder.executionHook.fallbackFailureException);
assertEquals(0, command.builder.executionHook.threadStart.get());
assertEquals(0, command.builder.executionHook.threadComplete.get());
assertEquals(1, command.builder.executionHook.cacheHit.get());
assertEquals("onCacheHit - ", command.builder.executionHook.executionSequence.toString());
}
});
}

/**
*********************** END THREAD-ISOLATED Execution Hook Tests **************************************
*/
Expand Down Expand Up @@ -7513,21 +7551,21 @@ protected Observable<Boolean> resumeWithFallback() {
/**
* A Command implementation that supports caching.
*/
private static class SuccessfulCacheableCommand extends TestHystrixCommand<String> {
private static class SuccessfulCacheableCommand<T> extends TestHystrixCommand<T> {

private final boolean cacheEnabled;
private volatile boolean executed = false;
private final String value;
private final T value;

public SuccessfulCacheableCommand(TestCircuitBreaker circuitBreaker, boolean cacheEnabled, String value) {
public SuccessfulCacheableCommand(TestCircuitBreaker circuitBreaker, boolean cacheEnabled, T value) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)));
this.value = value;
this.cacheEnabled = cacheEnabled;
}

@Override
protected Observable<String> construct() {
protected Observable<T> construct() {
executed = true;
System.out.println("successfully executed");
return Observable.just(value).subscribeOn(Schedulers.computation());
Expand All @@ -7540,7 +7578,7 @@ public boolean isCommandRunningInThread() {
@Override
public String getCacheKey() {
if (cacheEnabled)
return value;
return value.toString();
else
return null;
}
Expand Down
Loading

0 comments on commit 2070141

Please sign in to comment.