Skip to content

Commit

Permalink
AsyncInvocationinverceptors not properly created for each request (ec…
Browse files Browse the repository at this point in the history
…lipse-ee4j#4272)

* Fixes eclipse-ee4j#4264 - AsyncInvocationinverceptors not properly created for each request

Signed-off-by: David Kral <david.k.kral@oracle.com>
  • Loading branch information
Verdent authored and pdudits committed Oct 14, 2019
1 parent 1dfb5af commit 7b7726f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
*/
class ExecutorServiceWrapper implements ExecutorService {

static final ThreadLocal<List<AsyncInvocationInterceptor>> asyncInterceptors = new ThreadLocal<>();

private final ExecutorService wrapped;
private final List<AsyncInvocationInterceptor> asyncInterceptors;

ExecutorServiceWrapper(ExecutorService wrapped,
List<AsyncInvocationInterceptor> asyncInterceptors) {
ExecutorServiceWrapper(ExecutorService wrapped) {
this.wrapped = wrapped;
this.asyncInterceptors = asyncInterceptors;
}

@Override
Expand Down Expand Up @@ -111,24 +110,36 @@ public void execute(Runnable command) {
wrapped.execute(wrap(command));
}

private <T> Callable<T> wrap(Callable<T> task) {
private static <T> Callable<T> wrap(Callable<T> task) {
List<AsyncInvocationInterceptor> asyncInvocationInterceptors = asyncInterceptors.get();
asyncInterceptors.remove();
return () -> {
asyncInterceptors.forEach(AsyncInvocationInterceptor::applyContext);
applyContextOnInterceptors(asyncInvocationInterceptors);
return task.call();
};
}

private Runnable wrap(Runnable task) {
private static Runnable wrap(Runnable task) {
List<AsyncInvocationInterceptor> asyncInvocationInterceptors = asyncInterceptors.get();
asyncInterceptors.remove();
return () -> {
asyncInterceptors.forEach(AsyncInvocationInterceptor::applyContext);
applyContextOnInterceptors(asyncInvocationInterceptors);
task.run();
};
}

private static void applyContextOnInterceptors(List<AsyncInvocationInterceptor> asyncInvocationInterceptors) {
if (asyncInvocationInterceptors != null) {
//applyContext methods need to be called in reverse ordering of priority
for (int i = asyncInvocationInterceptors.size(); i-- > 0; ) {
asyncInvocationInterceptors.get(i).applyContext();
}
}
}

private <T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<T>> tasks) {
private static <T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<T>> tasks) {
return tasks.stream()
.map(this::wrap)
.map(ExecutorServiceWrapper::wrap)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.microprofile.rest.client.annotation.ClientHeaderParam;
import org.eclipse.microprofile.rest.client.annotation.RegisterClientHeaders;
import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor;
import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory;
import org.eclipse.microprofile.rest.client.ext.ClientHeadersFactory;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import org.glassfish.jersey.client.inject.ParameterUpdater;
Expand All @@ -64,7 +65,7 @@ class InterfaceModel {
private final CreationalContext<?> creationalContext;

private final List<ClientHeaderParamModel> clientHeaders;
private final List<AsyncInvocationInterceptor> asyncInterceptors;
private final List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories;
private final Set<ResponseExceptionMapper> responseExceptionMappers;
private final Set<ParamConverterProvider> paramConverterProviders;
private final Set<Annotation> interceptorAnnotations;
Expand All @@ -73,23 +74,23 @@ class InterfaceModel {
/**
* Creates new model based on interface class. Interface is parsed according to specific annotations.
*
* @param restClientClass interface class
* @param responseExceptionMappers registered exception mappers
* @param paramConverterProviders registered parameter providers
* @param asyncInterceptors async interceptors
* @param restClientClass interface class
* @param responseExceptionMappers registered exception mappers
* @param paramConverterProviders registered parameter providers
* @param asyncInterceptorFactories async interceptor factories
* @param injectionManager
* @return new model instance
*/
static InterfaceModel from(Class<?> restClientClass,
Set<ResponseExceptionMapper> responseExceptionMappers,
Set<ParamConverterProvider> paramConverterProviders,
List<AsyncInvocationInterceptor> asyncInterceptors,
List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories,
InjectionManager injectionManager,
BeanManager beanManager) {
return new Builder(restClientClass,
responseExceptionMappers,
paramConverterProviders,
asyncInterceptors,
asyncInterceptorFactories,
injectionManager,
beanManager).build();
}
Expand All @@ -106,7 +107,7 @@ private InterfaceModel(Builder builder) {
this.paramConverterProviders = builder.paramConverterProviders;
this.interceptorAnnotations = builder.interceptorAnnotations;
this.creationalContext = builder.creationalContext;
this.asyncInterceptors = builder.asyncInterceptors;
this.asyncInterceptorFactories = builder.asyncInterceptorFactories;
this.beanManager = builder.beanManager;
}

Expand Down Expand Up @@ -169,8 +170,8 @@ List<ClientHeaderParamModel> getClientHeaders() {
*
* @return registered async interceptors
*/
List<AsyncInvocationInterceptor> getAsyncInterceptors() {
return asyncInterceptors;
List<AsyncInvocationInterceptorFactory> getAsyncInterceptorFactories() {
return asyncInterceptorFactories;
}

/**
Expand Down Expand Up @@ -251,22 +252,22 @@ private static class Builder {
private ClientHeadersFactory clientHeadersFactory;
private CreationalContext<?> creationalContext;
private List<ClientHeaderParamModel> clientHeaders;
private List<AsyncInvocationInterceptor> asyncInterceptors;
private List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories;
private Set<ResponseExceptionMapper> responseExceptionMappers;
private Set<ParamConverterProvider> paramConverterProviders;
private Set<Annotation> interceptorAnnotations;

private Builder(Class<?> restClientClass,
Set<ResponseExceptionMapper> responseExceptionMappers,
Set<ParamConverterProvider> paramConverterProviders,
List<AsyncInvocationInterceptor> asyncInterceptors,
List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories,
InjectionManager injectionManager,
BeanManager beanManager) {
this.injectionManager = injectionManager;
this.restClientClass = restClientClass;
this.responseExceptionMappers = responseExceptionMappers;
this.paramConverterProviders = paramConverterProviders;
this.asyncInterceptors = asyncInterceptors;
this.asyncInterceptorFactories = asyncInterceptorFactories;
this.beanManager = beanManager;
filterAllInterceptorAnnotations();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.eclipse.microprofile.rest.client.RestClientDefinitionException;
import org.eclipse.microprofile.rest.client.annotation.ClientHeaderParam;
import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor;
import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;

/**
Expand Down Expand Up @@ -121,7 +122,7 @@ private MethodModel(Builder builder) {
subResourceModel = RestClientModel.from(returnType.getRawType(),
interfaceModel.getResponseExceptionMappers(),
interfaceModel.getParamConverterProviders(),
interfaceModel.getAsyncInterceptors(),
interfaceModel.getAsyncInterceptorFactories(),
interfaceModel.getInjectionManager(),
interfaceModel.getBeanManager());
} else {
Expand Down Expand Up @@ -243,6 +244,14 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder,
Object entity,
Method method,
MultivaluedMap<String, Object> customHeaders) {

//AsyncInterceptors initialization
List<AsyncInvocationInterceptor> asyncInterceptors = interfaceModel.getAsyncInterceptorFactories().stream()
.map(AsyncInvocationInterceptorFactory::newInterceptor)
.collect(Collectors.toList());
asyncInterceptors.forEach(AsyncInvocationInterceptor::prepareContext);
ExecutorServiceWrapper.asyncInterceptors.set(asyncInterceptors);

CompletableFuture<Object> result = new CompletableFuture<>();
Future<Response> theFuture;
if (entity != null
Expand All @@ -255,7 +264,7 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder,

CompletableFuture<Response> completableFuture = (CompletableFuture<Response>) theFuture;
completableFuture.thenAccept(response -> {
interfaceModel.getAsyncInterceptors().forEach(AsyncInvocationInterceptor::removeContext);
asyncInterceptors.forEach(AsyncInvocationInterceptor::removeContext);
try {
evaluateResponse(response, method);
if (returnType.getType().equals(Void.class)) {
Expand All @@ -269,7 +278,7 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder,
result.completeExceptionally(e);
}
}).exceptionally(throwable -> {
interfaceModel.getAsyncInterceptors().forEach(AsyncInvocationInterceptor::removeContext);
asyncInterceptors.forEach(AsyncInvocationInterceptor::removeContext);
result.completeExceptionally(throwable);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.security.AccessController;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,7 +37,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Priority;
import javax.net.ssl.HostnameVerifier;
Expand Down Expand Up @@ -81,7 +81,7 @@ class RestClientBuilderImpl implements RestClientBuilder {

private final Set<ResponseExceptionMapper> responseExceptionMappers;
private final Set<ParamConverterProvider> paramConverterProviders;
private final List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories;
private final List<AsyncInvocationInterceptorFactoryPriorityWrapper> asyncInterceptorFactories;
private final Config config;
private final ConfigWrapper configWrapper;
private URI uri;
Expand Down Expand Up @@ -153,14 +153,10 @@ public <T> T build(Class<T> interfaceClass) throws IllegalStateException, RestCl

//We need to check first if default exception mapper was not disabled by property on builder.
registerExceptionMapper();
//sort all AsyncInvocationInterceptorFactory by priority
asyncInterceptorFactories.sort(Comparator.comparingInt(AsyncInvocationInterceptorFactoryPriorityWrapper::getPriority));

//AsyncInterceptors initialization
List<AsyncInvocationInterceptor> asyncInterceptors = asyncInterceptorFactories.stream()
.map(AsyncInvocationInterceptorFactory::newInterceptor)
.collect(Collectors.toList());
asyncInterceptors.forEach(AsyncInvocationInterceptor::prepareContext);

clientBuilder.executorService(new ExecutorServiceWrapper(executorService.get(), asyncInterceptors));
clientBuilder.executorService(new ExecutorServiceWrapper(executorService.get()));

if (null != sslContext) {
clientBuilder.sslContext(sslContext);
Expand All @@ -187,7 +183,7 @@ public <T> T build(Class<T> interfaceClass) throws IllegalStateException, RestCl
RestClientModel restClientModel = RestClientModel.from(interfaceClass,
responseExceptionMappers,
paramConverterProviders,
asyncInterceptors,
new ArrayList<>(asyncInterceptorFactories),
injectionManagerExposer.injectionManager,
CdiUtil.getBeanManager());

Expand Down Expand Up @@ -338,11 +334,11 @@ public RestClientBuilder register(Class<?> componentClass, Map<Class<?>, Integer
public RestClientBuilder register(Object component) {
if (component instanceof ResponseExceptionMapper) {
ResponseExceptionMapper mapper = (ResponseExceptionMapper) component;
registerCustomProvider(component, -1);
registerCustomProvider(component, mapper.getPriority());
clientBuilder.register(mapper, mapper.getPriority());
} else {
clientBuilder.register(component);
registerCustomProvider(component, -1);
registerCustomProvider(component, null);
}
return this;
}
Expand Down Expand Up @@ -384,7 +380,7 @@ private boolean isSupportedCustomProvider(Class<?> providerClass) {
|| AsyncInvocationInterceptorFactory.class.isAssignableFrom(providerClass);
}

private void registerCustomProvider(Object instance, int priority) {
private void registerCustomProvider(Object instance, Integer priority) {
if (!isSupportedCustomProvider(instance.getClass())) {
return;
}
Expand All @@ -399,7 +395,9 @@ private void registerCustomProvider(Object instance, int priority) {
paramConverterProviders.add((ParamConverterProvider) instance);
}
if (instance instanceof AsyncInvocationInterceptorFactory) {
asyncInterceptorFactories.add((AsyncInvocationInterceptorFactory) instance);
asyncInterceptorFactories
.add(new AsyncInvocationInterceptorFactoryPriorityWrapper((AsyncInvocationInterceptorFactory) instance,
priority));
}
}

Expand All @@ -417,4 +415,30 @@ public boolean configure(FeatureContext context) {
}
}

private static class AsyncInvocationInterceptorFactoryPriorityWrapper
implements AsyncInvocationInterceptorFactory {

private AsyncInvocationInterceptorFactory factory;
private Integer priority;

AsyncInvocationInterceptorFactoryPriorityWrapper(AsyncInvocationInterceptorFactory factory, Integer priority) {
this.factory = factory;
this.priority = priority;
}

@Override
public AsyncInvocationInterceptor newInterceptor() {
return factory.newInterceptor();
}

Integer getPriority() {
if (priority == null) {
priority = Optional.ofNullable(factory.getClass().getAnnotation(Priority.class))
.map(Priority::value)
.orElse(Priorities.USER);
}
return priority;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.ext.ParamConverterProvider;

import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor;
import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import org.glassfish.jersey.internal.inject.InjectionManager;

Expand Down Expand Up @@ -56,13 +56,13 @@ class RestClientModel {
static RestClientModel from(Class<?> restClientClass,
Set<ResponseExceptionMapper> responseExceptionMappers,
Set<ParamConverterProvider> paramConverterProviders,
List<AsyncInvocationInterceptor> asyncInterceptors,
List<AsyncInvocationInterceptorFactory> asyncInterceptorFactories,
InjectionManager injectionManager,
BeanManager beanManager) {
InterfaceModel interfaceModel = InterfaceModel.from(restClientClass,
responseExceptionMappers,
paramConverterProviders,
asyncInterceptors,
asyncInterceptorFactories,
injectionManager,
beanManager);
return new Builder()
Expand Down

0 comments on commit 7b7726f

Please sign in to comment.