Skip to content

Commit

Permalink
PAYARA-3674: Preserve Thread Context Class Loader in Weld executor se…
Browse files Browse the repository at this point in the history
…rvice
  • Loading branch information
pdudits committed Apr 3, 2019
1 parent 6546e1a commit fe42d61
Showing 1 changed file with 153 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) [2016-2018] Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) [2016-2019] Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -45,8 +45,14 @@
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.jboss.weld.executor.AbstractExecutorServices;
import org.jboss.weld.manager.api.ExecutorServices;

Expand All @@ -56,21 +62,25 @@
* @author steve
*/
public class ExecutorServicesImpl extends AbstractExecutorServices implements ExecutorServices {


private final ExecutorService taskExecutor;
private final ContextualTimerExecutor timerExecutor;
private PayaraExecutorService executor;

public ExecutorServicesImpl(PayaraExecutorService service) {
executor = service;
taskExecutor = new ContextualTaskExecutor();
timerExecutor = new ContextualTimerExecutor();
}

@Override
public ExecutorService getTaskExecutor() {
return executor.getUnderlyingExecutorService();
return taskExecutor;
}

@Override
public ScheduledExecutorService getTimerExecutor() {
return executor.getUnderlyingScheduledExecutorService();
return timerExecutor;
}

@Override
Expand All @@ -88,22 +98,148 @@ public <T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<
ClassLoader TCCL = Thread.currentThread().getContextClassLoader();
List<Callable<T>> wrapped = new ArrayList<>(tasks.size());
for (Callable<T> task : tasks) {
wrapped.add(new Callable() {
@Override
public Object call() throws Exception {
ClassLoader old = Utility.setContextClassLoader(TCCL);
try {
return task.call();
} finally {
Utility.setContextClassLoader(old);
}
}

});
wrapped.add(inContextClassloader(TCCL, task));
}
return wrapped;
}



private static <V> Callable<V> inContextClassloader(ClassLoader contextClassLoader, Callable<V> task) {
return () -> {
ClassLoader old = Utility.setContextClassLoader(contextClassLoader);
try {
return task.call();
} finally {
Utility.setContextClassLoader(old);
}
};
}

private static Runnable inContextClassloader(ClassLoader contextClassLoader, Runnable task) {
return () -> {
ClassLoader old = Utility.setContextClassLoader(contextClassLoader);
try {
task.run();
} finally {
Utility.setContextClassLoader(old);
}
};
}

private static <V> Callable<V> inCurrentContextClassloader(Callable<V> task) {
return inContextClassloader(Thread.currentThread().getContextClassLoader(), task);
}

private static Runnable inCurrentContextClassloader(Runnable task) {
return inContextClassloader(Thread.currentThread().getContextClassLoader(), task);
}

class ContextualTaskExecutor implements ExecutorService {
private ExecutorService delegate;

ContextualTaskExecutor() {
this.delegate = executor.getUnderlyingExecutorService();
}

@Override
public void shutdown() {
throw new IllegalStateException("Downstream service cannot request shutdown");
}

@Override
public List<Runnable> shutdownNow() {
throw new IllegalStateException("Downstream service cannot request shutdown");
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(inCurrentContextClassloader(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(inCurrentContextClassloader(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(inCurrentContextClassloader(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(wrap(tasks));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate().invokeAll(wrap(tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(wrap(tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(wrap(tasks), timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate().execute(inCurrentContextClassloader(command));
}

protected ExecutorService delegate() {
return delegate;
}
}

class ContextualTimerExecutor extends ContextualTaskExecutor implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;

ContextualTimerExecutor() {
this.delegate = executor.getUnderlyingScheduledExecutorService();
}

protected ScheduledExecutorService delegate() {
return this.delegate;
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(inCurrentContextClassloader(command), delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(inCurrentContextClassloader(callable), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate().scheduleAtFixedRate(inCurrentContextClassloader(command), initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate().scheduleWithFixedDelay(inCurrentContextClassloader(command), initialDelay, delay, unit);
}
}
}

0 comments on commit fe42d61

Please sign in to comment.