diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusAccessModes.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusAccessModes.java new file mode 100644 index 00000000000000..7f4ae5cbe8b9fc --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusAccessModes.java @@ -0,0 +1,32 @@ +package io.quarkus.vertx.core.runtime; + +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; + +import io.vertx.core.spi.context.storage.AccessMode; + +final class QuarkusAccessModes { + + public static final AccessMode ACQUIRE_RELEASE = new AccessMode() { + @Override + public Object get(AtomicReferenceArray<Object> locals, int idx) { + return locals.get(idx); + } + + @Override + public void put(AtomicReferenceArray<Object> locals, int idx, Object value) { + locals.lazySet(idx, value); + } + + @Override + public Object getOrCreate(AtomicReferenceArray<Object> locals, int idx, Supplier<Object> initialValueSupplier) { + Object value = locals.get(idx); + if (value == null) { + value = initialValueSupplier.get(); + locals.lazySet(idx, value); + } + return value; + } + }; + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index 0731c69605e0df..f03eb623207fea 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -13,12 +13,10 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -46,7 +44,6 @@ import io.quarkus.vertx.core.runtime.config.ClusterConfiguration; import io.quarkus.vertx.core.runtime.config.EventBusConfiguration; import io.quarkus.vertx.core.runtime.config.VertxConfiguration; -import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.quarkus.vertx.mdc.provider.LateBoundMDCProvider; import io.quarkus.vertx.runtime.VertxCurrentContextFactory; import io.vertx.core.AsyncResult; @@ -584,15 +581,7 @@ public void runWith(Runnable task, Object context) { // The CDI contexts must not be propagated // First test if VertxCurrentContextFactory is actually used if (currentContextFactory != null) { - List<String> keys = currentContextFactory.keys(); - ConcurrentMap<Object, Object> local = vertxContext.localContextData(); - if (containsScopeKey(keys, local)) { - // Duplicate the context, copy the data, remove the request context - vertxContext = vertxContext.duplicate(); - vertxContext.localContextData().putAll(local); - keys.forEach(vertxContext.localContextData()::remove); - VertxContextSafetyToggle.setContextSafe(vertxContext, true); - } + vertxContext = currentContextFactory.duplicateContext(vertxContext); } vertxContext.beginDispatch(); try { @@ -604,23 +593,6 @@ public void runWith(Runnable task, Object context) { task.run(); } } - - private boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) { - if (keys.isEmpty()) { - return false; - } - if (keys.size() == 1) { - // Very often there will be only one key used - return localContextData.containsKey(keys.get(0)); - } else { - for (String key : keys) { - if (localContextData.containsKey(key)) { - return true; - } - } - } - return false; - } }; } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java index 6642f6bd1b80ff..ad260e607bf72b 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java @@ -2,6 +2,8 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.ContextLocalImpl; +import io.vertx.core.spi.context.storage.ContextLocal; public class VertxLocalsHelper { @@ -22,6 +24,10 @@ public static void throwOnRootContextAccess() { public static <T> T getLocal(ContextInternal context, Object key) { if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals + if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) { + var localKey = (ContextLocal<T>) key; + return (T) context.getLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE); + } return (T) context.localContextData().get(key); } else { throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); @@ -31,7 +37,10 @@ public static <T> T getLocal(ContextInternal context, Object key) { public static void putLocal(ContextInternal context, Object key, Object value) { if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals - context.localContextData().put(key, value); + if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) { + var localKey = (ContextLocal<Object>) key; + context.putLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE, value); + } } else { throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); } @@ -40,6 +49,14 @@ public static void putLocal(ContextInternal context, Object key, Object value) { public static boolean removeLocal(ContextInternal context, Object key) { if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals + if (key instanceof ContextLocalImpl<?> || key instanceof ContextLocal<?>) { + var localKey = (ContextLocal<Object>) key; + if (localKey == null) { + return false; + } + context.removeLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE); + return true; + } return context.localContextData().remove(key) != null; } else { throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java index 0ba222aee0d4e9..59d04078026112 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java @@ -3,6 +3,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextLocalImpl; /** * This is meant for other extensions to integrate with, to help @@ -39,7 +40,6 @@ */ public final class VertxContextSafetyToggle { - private static final Object ACCESS_TOGGLE_KEY = new Object(); public static final String UNRESTRICTED_BY_DEFAULT_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT"; /** @@ -49,6 +49,9 @@ public final class VertxContextSafetyToggle { public static final String FULLY_DISABLE_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.I_HAVE_CHECKED_EVERYTHING"; private static final boolean UNRESTRICTED_BY_DEFAULT = Boolean.getBoolean(UNRESTRICTED_BY_DEFAULT_PROPERTY); private static final boolean FULLY_DISABLED = Boolean.getBoolean(FULLY_DISABLE_PROPERTY); + // TODO VertxImpl should be allocated AFTER getting here, to make it work! + // ContextLocalImpl permanently assign a globally unique key: if the check is disabled, there's no point in creating it + private static final ContextLocalImpl<Boolean> ACCESS_TOGGLE_KEY = FULLY_DISABLED ? new ContextLocalImpl<>() : null; /** * Verifies if the current Vert.x context was flagged as safe diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java index 4934f6caafcbb8..b21dba5d074548 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxCurrentContextFactory.java @@ -1,9 +1,13 @@ package io.quarkus.vertx.runtime; import java.lang.annotation.Annotation; -import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.enterprise.context.RequestScoped; import io.netty.util.concurrent.FastThreadLocal; import io.quarkus.arc.CurrentContext; @@ -14,23 +18,34 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.ContextLocalImpl; public class VertxCurrentContextFactory implements CurrentContextFactory { private static final String LOCAL_KEY_PREFIX = "io.quarkus.vertx.cdi-current-context"; + // TODO VertxImpl should be allocated AFTER getting here, to make it work! + private static final ContextLocalImpl<? extends ContextState> REQUEST_SCOPED_LOCAL_KEY = new ContextLocalImpl<>(); private final List<String> keys; - private final List<String> unmodifiableKeys; + private final AtomicBoolean requestScopedKeyCreated; public VertxCurrentContextFactory() { // There will be only a few mutative operations max this.keys = new CopyOnWriteArrayList<>(); - // We do not want to allocate a new object for each VertxCurrentContextFactory#keys() invocation - this.unmodifiableKeys = Collections.unmodifiableList(keys); + this.requestScopedKeyCreated = new AtomicBoolean(); } @Override public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class<? extends Annotation> scope) { + if (scope == RequestScoped.class) { + if (!requestScopedKeyCreated.compareAndExchange(false, true)) { + throw new IllegalStateException( + "Multiple current contexts for the same scope are not supported. Current context for " + + scope + " already exists!"); + } + return new VertxCurrentContext<T>(REQUEST_SCOPED_LOCAL_KEY); + } String key = LOCAL_KEY_PREFIX + scope.getName(); if (keys.contains(key)) { throw new IllegalStateException( @@ -41,20 +56,47 @@ public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class return new VertxCurrentContext<>(key); } - /** - * - * @return an unmodifiable list of used keys - */ - public List<String> keys() { - return unmodifiableKeys; + public ContextInternal duplicateContext(ContextInternal vertxContext) { + List<String> keys = this.keys; + // TODO optimize it to do it right! + ConcurrentMap<Object, Object> local = vertxContext.localContextData(); + if (containsScopeKey(keys, local)) { + // Duplicate the context, copy the data, remove the request context + vertxContext = vertxContext.duplicate(); + vertxContext.localContextData().putAll(local); + keys.forEach(vertxContext.localContextData()::remove); + if (requestScopedKeyCreated.get()) { + vertxContext.removeLocal(REQUEST_SCOPED_LOCAL_KEY); + } + VertxContextSafetyToggle.setContextSafe(vertxContext, true); + } + return vertxContext; + } + + private static boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) { + if (keys.isEmpty()) { + return false; + } + if (keys.size() == 1) { + // Very often there will be only one key used + return localContextData.containsKey(keys.get(0)); + } else { + for (String key : keys) { + if (localContextData.containsKey(key)) { + return true; + } + } + } + return false; } private static final class VertxCurrentContext<T extends ContextState> implements CurrentContext<T> { - private final String key; - private final FastThreadLocal<T> fallback = new FastThreadLocal<>(); + // It allows to use both ContextLocalImpl and String keys + private final Object key; + private volatile FastThreadLocal<T> fallback; - private VertxCurrentContext(String key) { + private VertxCurrentContext(Object key) { this.key = key; } @@ -64,7 +106,24 @@ public T get() { if (context != null && VertxContext.isDuplicatedContext(context)) { return context.getLocal(key); } - return fallback.get(); + return fallback().get(); + } + + private FastThreadLocal<T> fallback() { + var fallback = this.fallback; + if (fallback == null) { + fallback = getOrCreateFallback(); + } + return fallback; + } + + private synchronized FastThreadLocal<T> getOrCreateFallback() { + var fallback = this.fallback; + if (fallback == null) { + fallback = new FastThreadLocal<>(); + this.fallback = fallback; + } + return fallback; } @Override @@ -80,7 +139,7 @@ public void set(T state) { } } else { - fallback.set(state); + fallback().set(state); } } @@ -91,7 +150,7 @@ public void remove() { // NOOP - the DC should not be shared. // context.removeLocal(key); } else { - fallback.remove(); + fallback().remove(); } }