Skip to content

Commit

Permalink
Merge pull request Netflix#1320 from zsoltm/1.5.4.collapse-observable
Browse files Browse the repository at this point in the history
Javanica: basic Observable Collapser support
  • Loading branch information
mattrjacobs authored Sep 7, 2016
2 parents b4d129e + e9cf47b commit fe83e49
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 44 deletions.
32 changes: 25 additions & 7 deletions hystrix-contrib/hystrix-javanica/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,18 @@ Suppose you have some command which calls should be collapsed in one backend cal

Example:
```java

/** Asynchronous Execution */
@HystrixCollapser(batchMethod = "getUserByIds")
public Future<User> getUserById(String id) {
public Future<User> getUserByIdAsync(String id) {
return null;
}

/** Reactive Execution */
@HystrixCollapser(batchMethod = "getUserByIds")
public Observable<User> getUserByIdReact(String id) {
return null;
}

@HystrixCommand
public List<User> getUserByIds(List<String> ids) {
Expand All @@ -593,13 +601,23 @@ Example:
}
return users;
}


Future<User> f1 = userService.getUserById("1");
Future<User> f2 = userService.getUserById("2");
Future<User> f3 = userService.getUserById("3");
Future<User> f4 = userService.getUserById("4");
Future<User> f5 = userService.getUserById("5");
// Async
Future<User> f1 = userService.getUserByIdAsync("1");
Future<User> f2 = userService.getUserByIdAsync("2");
Future<User> f3 = userService.getUserByIdAsync("3");
Future<User> f4 = userService.getUserByIdAsync("4");
Future<User> f5 = userService.getUserByIdAsync("5");

// Reactive
Observable<User> u1 = getUserByIdReact("1");
Observable<User> u2 = getUserByIdReact("2");
Observable<User> u3 = getUserByIdReact("3");
Observable<User> u4 = getUserByIdReact("4");
Observable<User> u5 = getUserByIdReact("5");

// Materialize reactive commands
Iterable<User> users = Observables.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();
```
A method annotated with ```@HystrixCollapser``` annotation can return any value with compatible type, it does not affect the result of collapser execution, collapser method can even return ```null``` or another stub.
There are several rules applied for methods signatures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.javanica.aop.aspectj;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.netflix.hystrix.HystrixInvokable;
import com.netflix.hystrix.contrib.javanica.annotation.DefaultProperties;
Expand Down Expand Up @@ -168,33 +167,33 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec
}

Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);
if (batchCommandMethod == null || !batchCommandMethod.getReturnType().equals(List.class)) {
throw new IllegalStateException("required batch method for collapser is absent: "
+ "(java.util.List) " + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List)");
}

if (batchCommandMethod == null)
throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());

Class<?> batchReturnType = batchCommandMethod.getReturnType();
Class<?> collapserReturnType = collapserMethod.getReturnType();
boolean observable = collapserReturnType.equals(Observable.class);

if (!collapserMethod.getParameterTypes()[0]
.equals(getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected"
.equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "
+ obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
}

Class<?> collapserMethodReturnType;
if (Future.class.isAssignableFrom(collapserMethod.getReturnType())) {
collapserMethodReturnType = getGenericParameter(collapserMethod.getGenericReturnType());
} else {
collapserMethodReturnType = collapserMethod.getReturnType();
}
final Class<?> collapserMethodReturnType = getFirstGenericParameter(
collapserMethod.getGenericReturnType(),
Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);

Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());
if (!collapserMethodReturnType
.equals(getGenericParameter(batchCommandMethod.getGenericReturnType()))) {
.equals(batchCommandActualReturnType)) {
throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +
"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getGenericParameter(batchCommandMethod.getGenericReturnType()));
batchCommandActualReturnType);
}

HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);
Expand All @@ -212,11 +211,12 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec

builder.hystrixCollapser(hystrixCollapser);
builder.defaultCollapserKey(collapserMethod.getName());
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserMethod.getReturnType()));
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));

builder.defaultCommandKey(batchCommandMethod.getName());
builder.hystrixCommand(hystrixCommand);
builder.executionType(ExecutionType.getExecutionType(batchCommandMethod.getReturnType()));
builder.executionType(ExecutionType.getExecutionType(batchReturnType));
builder.observable(observable);
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);
if (fallbackMethod.isPresent()) {
fallbackMethod.validateReturnType(batchCommandMethod);
Expand Down Expand Up @@ -260,14 +260,26 @@ private static Method getAjcMethodFromTarget(JoinPoint joinPoint) {
}


private static Class<?> getGenericParameter(Type type) {
Type tType = ((ParameterizedType) type).getActualTypeArguments()[0];
String className = tType.toString().split(" ")[1];
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw Throwables.propagate(e);
private static Class<?> getFirstGenericParameter(Type type) {
return getFirstGenericParameter(type, 1);
}

private static Class<?> getFirstGenericParameter(final Type type, final int nestedDepth) {
int cDepth = 0;
Type tType = type;

for (int cDept = 0; cDept < nestedDepth; cDept++) {
if (!(tType instanceof ParameterizedType))
throw new IllegalStateException(String.format("Sub type at nesting level %d of %s is expected to be generic", cDepth, type));
tType = ((ParameterizedType) tType).getActualTypeArguments()[cDept];
}

if (tType instanceof ParameterizedType)
return (Class<?>) ((ParameterizedType) tType).getRawType();
else if (tType instanceof Class)
return (Class<?>) tType;

throw new UnsupportedOperationException("Unsupported type " + tType);
}

private static MetaHolder.Builder setDefaultProperties(MetaHolder.Builder builder, Class<?> declaringClass, final ProceedingJoinPoint joinPoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.javanica.command;


import com.google.common.base.Throwables;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.cache.CacheInvocationContext;
import com.netflix.hystrix.contrib.javanica.cache.HystrixCacheKeyGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public HystrixCommandBuilder(Builder builder) {
this.executionType = builder.executionType;
}

public static Builder builder() {
return new Builder();
public static <ResponseType> Builder builder() {
return new Builder<ResponseType>();
}

public GenericSetterBuilder getSetterBuilder() {
Expand Down Expand Up @@ -85,12 +85,12 @@ public ExecutionType getExecutionType() {
}


public static class Builder {
public static class Builder<ResponseType> {
private GenericSetterBuilder setterBuilder;
private CommandActions commandActions;
private CacheInvocationContext<CacheResult> cacheResultInvocationContext;
private CacheInvocationContext<CacheRemove> cacheRemoveInvocationContext;
private Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests = Collections.emptyList();
private Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests = Collections.emptyList();
private List<Class<? extends Throwable>> ignoreExceptions = Collections.emptyList();
private ExecutionType executionType = ExecutionType.SYNCHRONOUS;

Expand Down Expand Up @@ -144,7 +144,7 @@ public Builder cacheRemoveInvocationContext(CacheInvocationContext<CacheRemove>
* @param pCollapsedRequests the collapsed requests
* @return this {@link HystrixCommandBuilder.Builder}
*/
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<Object, Object>> pCollapsedRequests) {
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> pCollapsedRequests) {
this.collapsedRequests = pCollapsedRequests;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public HystrixCommandBuilder create(MetaHolder metaHolder) {
return create(metaHolder, Collections.<HystrixCollapser.CollapsedRequest<Object, Object>>emptyList());
}

public HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests) {
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
validateMetaHolder(metaHolder);

return HystrixCommandBuilder.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.netflix.hystrix.contrib.javanica.command;

import com.netflix.hystrix.HystrixExecutable;
import com.netflix.hystrix.HystrixInvokable;
import com.netflix.hystrix.contrib.javanica.collapser.CommandCollapser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix.contrib.javanica.test.common.collapser;

import com.google.common.collect.Sets;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.HystrixRequestLog;
Expand All @@ -25,9 +26,13 @@
import com.netflix.hystrix.contrib.javanica.test.common.domain.User;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -77,6 +82,33 @@ public void testGetUserById() throws ExecutionException, InterruptedException {
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testReactive() throws Exception {

final Observable<User> u1 = userService.getUserByIdReactive("1");
final Observable<User> u2 = userService.getUserByIdReactive("2");
final Observable<User> u3 = userService.getUserByIdReactive("3");
final Observable<User> u4 = userService.getUserByIdReactive("4");
final Observable<User> u5 = userService.getUserByIdReactive("5");

final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();

Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
for (User cUser : users) {
assertEquals(expectedIds.remove(cUser.getId()), true);
}
assertEquals(expectedIds.isEmpty(), true);
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
.getAllExecutedCommands().iterator().next();
// assert the command is the one we're expecting
assertEquals("getUserByIds", command.getCommandKey().name());
// confirm that it was a COLLAPSED command execution
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
// and that it was successful
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetUserByIdWithFallback() throws ExecutionException, InterruptedException {
Future<User> f1 = userService.getUserByIdWithFallback("1");
Expand Down Expand Up @@ -158,6 +190,7 @@ public void testGetUserByIdWrongCollapserNoArgs() {

public static class UserService {

public static final Logger log = LoggerFactory.getLogger(UserService.class);
public static final User DEFAULT_USER = new User("def", "def");


Expand All @@ -173,6 +206,11 @@ public Future<User> getUserByIdWithFallback(String id) {
return null;
}

@HystrixCollapser(batchMethod = "getUserByIds",
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
public Observable<User> getUserByIdReactive(String id) {
return null;
}

@HystrixCollapser(batchMethod = "getUserByIdsThrowsException",
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
Expand Down Expand Up @@ -226,14 +264,14 @@ public List<User> getUserByIds(List<String> ids) {
for (String id : ids) {
users.add(new User(id, "name: " + id));
}
log.debug("executing on thread id: {}", Thread.currentThread().getId());
return users;
}

@HystrixCommand(fallbackMethod = "getUserByIdsFallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000")// for debug
})

public List<User> getUserByIdsWithFallback(List<String> ids) {
throw new RuntimeException("not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ log4j.rootLogger = ERROR, CONSOLE

# Define the console appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.File=${log}/log.out

# Define the layout for console appender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.conversionPattern=%m%n
log4j.appender.CONSOLE.layout.conversionPattern=%m%n

log4j.logger.com.netflix.hystrix.contrib.javanica=DEBUG

0 comments on commit fe83e49

Please sign in to comment.