Skip to content

Commit

Permalink
Support duplicate context duplication.
Browse files Browse the repository at this point in the history
Motivation:

Duplicating a duplicated context is supported but the duplication semantic is not defined.

Changes:

This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null.
  • Loading branch information
vietj committed Jan 14, 2025
1 parent e7a0f6d commit 3030490
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
class ContextBase extends AtomicReferenceArray<Object> {

private final int localsLength;
final int localsLength;

ContextBase(int localsLength) {
super(localsLength);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME);

private final ThreadingModel threadingModel;
private final VertxInternal owner;
private final VertxImpl owner;
private final JsonObject config;
private final Deployment deployment;
private final CloseFuture closeFuture;
Expand All @@ -58,7 +58,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
final WorkerPool workerPool;
final WorkerTaskQueue executeBlockingTasks;

public ContextImpl(VertxInternal vertx,
public ContextImpl(VertxImpl vertx,
int localsLength,
ThreadingModel threadingModel,
EventLoop eventLoop,
Expand Down Expand Up @@ -116,7 +116,7 @@ public EventLoop nettyEventLoop() {
return eventLoop;
}

public VertxInternal owner() {
public VertxImpl owner() {
return owner;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0, ConcurrentHashMap::new);

/**
* @return the current context
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,27 @@

import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Function;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

public static <T> ContextLocal<T> create(Class<T> type, Function<T, T> duplicator) {
synchronized (LocalSeq.class) {
int idx = LocalSeq.locals.size();
ContextLocal<T> local = new ContextLocalImpl<>(idx, duplicator);
LocalSeq.locals.add(local);
return local;
}
}

final int index;
final Function<T, T> duplicator;

public ContextLocalImpl(int index) {
public ContextLocalImpl(int index, Function<T, T> duplicator) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
this.duplicator = duplicator;
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate);
DuplicatedContext duplicate = new DuplicatedContext(delegate);
delegate.owner().duplicate(this, duplicate);
return duplicate;
}

@Override
Expand Down
29 changes: 18 additions & 11 deletions src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,35 @@
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);
static final List<ContextLocal<?>> locals = new ArrayList<>();

static {
reset();
}

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
synchronized static void reset() {
// 0 : reserved slot for local context map
locals.clear();
locals.add(ContextInternal.LOCAL_MAP);
}

static int next() {
return seq.getAndIncrement();
synchronized static ContextLocal<?>[] get() {
return locals.toArray(new ContextLocal[0]);
}
}
19 changes: 16 additions & 3 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.vertx.core.impl.btc.BlockedThreadChecker;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.impl.transports.JDKTransport;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -134,7 +136,7 @@ private static ThreadFactory virtualThreadFactory() {
private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
private final int contextLocalsLength;
private final ContextLocal<?>[] contextLocals;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -191,7 +193,7 @@ private static ThreadFactory virtualThreadFactory() {

ThreadFactory virtualThreadFactory = virtualThreadFactory();

contextLocalsLength = LocalSeq.get();
contextLocals = LocalSeq.get();
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
Expand Down Expand Up @@ -580,7 +582,7 @@ private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture


private ContextImpl createContext(ThreadingModel threadingModel, EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl, EventExecutor eventExecutor, WorkerPool wp) {
return new ContextImpl(this, contextLocalsLength, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, contextLocals.length, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand Down Expand Up @@ -790,6 +792,17 @@ public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
});
}

void duplicate(ContextBase src, ContextBase dst) {
for (int i = 0;i < contextLocals.length;i++) {
ContextLocalImpl<?> contextLocal = (ContextLocalImpl<?>) contextLocals[i];
Object local = src.get(i);
if (local != null) {
local = ((Function)contextLocal.duplicator).apply(local);
}
dst.set(i, local);
}
}

@Override
public Future<String> deployVerticle(String name, DeploymentOptions options) {
if (options.isHa() && haManager() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;

import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -35,7 +36,16 @@ public interface ContextLocal<T> {
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type) {
return new ContextLocalImpl<>();
return ContextLocalImpl.create(type, Function.identity());
}

/**
* Registers a context local storage.
*
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type, Function<T, T> duplicator) {
return ContextLocalImpl.create(type, duplicator);
}

/**
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1238,4 +1238,19 @@ public void testInterruptTask(ContextInternal context, Consumer<Runnable> actor)
assertTrue((System.currentTimeMillis() - now) < 2000);
assertTrue(interrupted.get());
}

@Test
public void testNestedDuplicate() {
ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate();
ctx.putLocal("foo", "bar");
Object expected = new Object();
ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected);
ContextInternal duplicate = ctx.duplicate();
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
ctx.removeLocal("foo");
ctx.removeLocal(contextLocal, AccessMode.CONCURRENT);
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
}
}

0 comments on commit 3030490

Please sign in to comment.