Skip to content

Commit

Permalink
Use the new Vertx local context storage
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed May 20, 2024
1 parent bc4d062 commit 1616ea5
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

import io.vertx.core.spi.context.storage.AccessMode;

final class QuarkusAccessModes {

public static final AccessMode ACQUIRE_RELEASE = new AccessMode() {
@Override
public Object get(AtomicReferenceArray<Object> locals, int idx) {
return locals.get(idx);
}

@Override
public void put(AtomicReferenceArray<Object> locals, int idx, Object value) {
locals.lazySet(idx, value);
}

@Override
public Object getOrCreate(AtomicReferenceArray<Object> locals, int idx, Supplier<Object> initialValueSupplier) {
Object value = locals.get(idx);
if (value == null) {
value = initialValueSupplier.get();
locals.lazySet(idx, value);
}
return value;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -46,7 +44,6 @@
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
import io.quarkus.vertx.core.runtime.config.EventBusConfiguration;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.vertx.mdc.provider.LateBoundMDCProvider;
import io.quarkus.vertx.runtime.VertxCurrentContextFactory;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -584,15 +581,7 @@ public void runWith(Runnable task, Object context) {
// The CDI contexts must not be propagated
// First test if VertxCurrentContextFactory is actually used
if (currentContextFactory != null) {
List<String> keys = currentContextFactory.keys();
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (containsScopeKey(keys, local)) {
// Duplicate the context, copy the data, remove the request context
vertxContext = vertxContext.duplicate();
vertxContext.localContextData().putAll(local);
keys.forEach(vertxContext.localContextData()::remove);
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
}
vertxContext = currentContextFactory.duplicateContext(vertxContext);
}
vertxContext.beginDispatch();
try {
Expand All @@ -604,23 +593,6 @@ public void runWith(Runnable task, Object context) {
task.run();
}
}

private boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) {
if (keys.isEmpty()) {
return false;
}
if (keys.size() == 1) {
// Very often there will be only one key used
return localContextData.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (localContextData.containsKey(key)) {
return true;
}
}
}
return false;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;
import io.vertx.core.spi.context.storage.ContextLocal;

public class VertxLocalsHelper {

Expand All @@ -22,6 +24,10 @@ public static void throwOnRootContextAccess() {
public static <T> T getLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) {
var localKey = (ContextLocal<T>) key;
return (T) context.getLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
}
return (T) context.localContextData().get(key);
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand All @@ -31,7 +37,10 @@ public static <T> T getLocal(ContextInternal context, Object key) {
public static void putLocal(ContextInternal context, Object key, Object value) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
context.localContextData().put(key, value);
if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) {
var localKey = (ContextLocal<Object>) key;
context.putLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE, value);
}
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
}
Expand All @@ -40,6 +49,14 @@ public static void putLocal(ContextInternal context, Object key, Object value) {
public static boolean removeLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) {
var localKey = (ContextLocal<Object>) key;
if (localKey == null) {
return false;
}
context.removeLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
return true;
}
return context.localContextData().remove(key) != null;
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextLocalImpl;

/**
* This is meant for other extensions to integrate with, to help
Expand Down Expand Up @@ -39,7 +40,6 @@
*/
public final class VertxContextSafetyToggle {

private static final Object ACCESS_TOGGLE_KEY = new Object();
public static final String UNRESTRICTED_BY_DEFAULT_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT";

/**
Expand All @@ -49,6 +49,9 @@ public final class VertxContextSafetyToggle {
public static final String FULLY_DISABLE_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.I_HAVE_CHECKED_EVERYTHING";
private static final boolean UNRESTRICTED_BY_DEFAULT = Boolean.getBoolean(UNRESTRICTED_BY_DEFAULT_PROPERTY);
private static final boolean FULLY_DISABLED = Boolean.getBoolean(FULLY_DISABLE_PROPERTY);
// TODO VertxImpl should be allocated AFTER getting here, to make it work!
// ContextLocalImpl permanently assign a globally unique key: if the check is disabled, there's no point in creating it
private static final ContextLocalImpl<Boolean> ACCESS_TOGGLE_KEY = FULLY_DISABLED ? new ContextLocalImpl<>() : null;

/**
* Verifies if the current Vert.x context was flagged as safe
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.quarkus.vertx.runtime;

import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.enterprise.context.RequestScoped;

import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.arc.CurrentContext;
Expand All @@ -14,23 +18,34 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;

public class VertxCurrentContextFactory implements CurrentContextFactory {

private static final String LOCAL_KEY_PREFIX = "io.quarkus.vertx.cdi-current-context";
// TODO VertxImpl should be allocated AFTER getting here, to make it work!
private static final ContextLocalImpl<? extends ContextState> REQUEST_SCOPED_LOCAL_KEY = new ContextLocalImpl<>();

private final List<String> keys;
private final List<String> unmodifiableKeys;
private final AtomicBoolean requestScopedKeyCreated;

public VertxCurrentContextFactory() {
// There will be only a few mutative operations max
this.keys = new CopyOnWriteArrayList<>();
// We do not want to allocate a new object for each VertxCurrentContextFactory#keys() invocation
this.unmodifiableKeys = Collections.unmodifiableList(keys);
this.requestScopedKeyCreated = new AtomicBoolean();
}

@Override
public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class<? extends Annotation> scope) {
if (scope == RequestScoped.class) {
if (!requestScopedKeyCreated.compareAndExchange(false, true)) {
throw new IllegalStateException(
"Multiple current contexts for the same scope are not supported. Current context for "
+ scope + " already exists!");
}
return new VertxCurrentContext<T>(REQUEST_SCOPED_LOCAL_KEY);
}
String key = LOCAL_KEY_PREFIX + scope.getName();
if (keys.contains(key)) {
throw new IllegalStateException(
Expand All @@ -41,20 +56,47 @@ public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class
return new VertxCurrentContext<>(key);
}

/**
*
* @return an unmodifiable list of used keys
*/
public List<String> keys() {
return unmodifiableKeys;
public ContextInternal duplicateContext(ContextInternal vertxContext) {
List<String> keys = this.keys;
// TODO optimize it to do it right!
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (containsScopeKey(keys, local)) {
// Duplicate the context, copy the data, remove the request context
vertxContext = vertxContext.duplicate();
vertxContext.localContextData().putAll(local);
keys.forEach(vertxContext.localContextData()::remove);
if (requestScopedKeyCreated.get()) {
vertxContext.removeLocal(REQUEST_SCOPED_LOCAL_KEY);
}
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
}
return vertxContext;
}

private static boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) {
if (keys.isEmpty()) {
return false;
}
if (keys.size() == 1) {
// Very often there will be only one key used
return localContextData.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (localContextData.containsKey(key)) {
return true;
}
}
}
return false;
}

private static final class VertxCurrentContext<T extends ContextState> implements CurrentContext<T> {

private final String key;
private final FastThreadLocal<T> fallback = new FastThreadLocal<>();
// It allows to use both ContextLocalImpl and String keys
private final Object key;
private volatile FastThreadLocal<T> fallback;

private VertxCurrentContext(String key) {
private VertxCurrentContext(Object key) {
this.key = key;
}

Expand All @@ -64,7 +106,24 @@ public T get() {
if (context != null && VertxContext.isDuplicatedContext(context)) {
return context.getLocal(key);
}
return fallback.get();
return fallback().get();
}

private FastThreadLocal<T> fallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = getOrCreateFallback();
}
return fallback;
}

private synchronized FastThreadLocal<T> getOrCreateFallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = new FastThreadLocal<>();
this.fallback = fallback;
}
return fallback;
}

@Override
Expand All @@ -80,7 +139,7 @@ public void set(T state) {
}

} else {
fallback.set(state);
fallback().set(state);
}
}

Expand All @@ -91,7 +150,7 @@ public void remove() {
// NOOP - the DC should not be shared.
// context.removeLocal(key);
} else {
fallback.remove();
fallback().remove();
}
}

Expand Down

0 comments on commit 1616ea5

Please sign in to comment.