diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java index f7d6c0627..1046a0d4c 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java @@ -41,7 +41,9 @@ import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.functions.Func1; import java.lang.reflect.Method; @@ -109,8 +111,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP return result; } - private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) { - return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder)) + private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) { + return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder)) .onErrorResumeNext(new Func1() { @Override public Observable call(Throwable throwable) { @@ -122,7 +124,16 @@ public Observable call(Throwable throwable) { } return Observable.error(throwable); } - }); + }), metaHolder); + } + + private Object mapObservable(Observable observable, final MetaHolder metaHolder) { + if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) { + return observable.toCompletable(); + } else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) { + return observable.toSingle(); + } + return observable; } private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) { diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java index ea3e285d1..55f4f26c8 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java @@ -15,8 +15,12 @@ */ package com.netflix.hystrix.contrib.javanica.command; +import com.google.common.collect.ImmutableSet; +import rx.Completable; import rx.Observable; +import rx.Single; +import java.util.Set; import java.util.concurrent.Future; /** @@ -39,6 +43,9 @@ public enum ExecutionType { */ OBSERVABLE; + // RX types + private static final Set RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class); + /** * Gets execution type for specified class type. * @param type the type @@ -47,11 +54,19 @@ public enum ExecutionType { public static ExecutionType getExecutionType(Class type) { if (Future.class.isAssignableFrom(type)) { return ExecutionType.ASYNCHRONOUS; - } else if (Observable.class.isAssignableFrom(type)) { + } else if (isRxType(type)) { return ExecutionType.OBSERVABLE; } else { return ExecutionType.SYNCHRONOUS; } } + private static boolean isRxType(Class cl) { + for (Class rxType : RX_TYPES) { + if (rxType.isAssignableFrom(cl)) { + return true; + } + } + return false; + } } diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java index 120e68f51..c5623aba3 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java @@ -28,7 +28,9 @@ import com.netflix.hystrix.exception.HystrixBadRequestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.functions.Func1; import javax.annotation.concurrent.ThreadSafe; @@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) { protected Observable construct() { Observable result; try { - result = ((Observable) commandActions.getCommandAction().execute(executionType)) + Observable observable = toObservable(commandActions.getCommandAction().execute(executionType)); + result = observable .onErrorResumeNext(new Func1() { @Override public Observable call(Throwable throwable) { @@ -157,4 +160,16 @@ boolean isIgnorable(Throwable throwable) { } return false; } + + private Observable toObservable(Object obj) { + if (Observable.class.isAssignableFrom(obj.getClass())) { + return (Observable) obj; + } else if (Completable.class.isAssignableFrom(obj.getClass())) { + return ((Completable) obj).toObservable(); + } else if (Single.class.isAssignableFrom(obj.getClass())) { + return ((Single) obj).toObservable(); + } else { + throw new IllegalStateException("unsupported rx type: " + obj.getClass()); + } + } } diff --git a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java index 9125b6fb9..262c24bb6 100644 --- a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java +++ b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java @@ -24,8 +24,10 @@ import org.apache.commons.lang3.StringUtils; import org.junit.Before; import org.junit.Test; +import rx.Completable; import rx.Observable; import rx.Observer; +import rx.Single; import rx.Subscriber; import rx.functions.Action1; import rx.subjects.ReplaySubject; @@ -90,6 +92,19 @@ public void call(User user) { assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } + @Test + public void testCompletable(){ + userService.getUserCompletable("1", "name: "); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserCompletable"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); + } + + @Test + public void testSingle(){ + userService.getUserSingle("1", "name: "); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserSingle"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); + } @Test public void testGetUserWithRegularFallback() { @@ -163,6 +178,18 @@ public Observable getUser(final String id, final String name) { return createObservable(id, name); } + @HystrixCommand + public Completable getUserCompletable(final String id, final String name) { + validate(id, name, "getUser has failed"); + return createObservable(id, name).toCompletable(); + } + + @HystrixCommand + public Single getUserSingle(final String id, final String name) { + validate(id, name, "getUser has failed"); + return createObservable(id, name).toSingle(); + } + @HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY) public Observable getUserRegularFallback(final String id, final String name) { validate(id, name, "getUser has failed");