diff --git a/src/main/java/io/lettuce/core/resource/ClientResources.java b/src/main/java/io/lettuce/core/resource/ClientResources.java index 17f97e8f4a..0c5f02cd3d 100644 --- a/src/main/java/io/lettuce/core/resource/ClientResources.java +++ b/src/main/java/io/lettuce/core/resource/ClientResources.java @@ -60,12 +60,23 @@ public interface ClientResources { /** * Create a new {@link ClientResources} using default settings. * - * @return a new instance of a default client resources. + * @return a new instance of default client resources. */ static ClientResources create() { return DefaultClientResources.create(); } + /** + * Create a new {@link ClientResources} using default settings. + * + * @param threadFactoryProvider provides a {@link java.util.concurrent.ThreadFactory} to create threads. + * @return a new instance of default client resources. + * @since 6.1.1 + */ + static ClientResources create(ThreadFactoryProvider threadFactoryProvider) { + return DefaultClientResources.builder().threadFactoryProvider(threadFactoryProvider).build(); + } + /** * Create a new {@link ClientResources} using default settings. * @@ -237,6 +248,23 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ Builder socketAddressResolver(SocketAddressResolver socketAddressResolver); + /** + * Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName} to create threads. + *

+ * Applies only to threading resources created by {@link ClientResources} when not configuring {@link #timer()}, + * {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}. + * + * @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}, must not be {@code null}. + * @return {@code this} {@link Builder}. + * @since 6.1.1 + * @see #eventExecutorGroup(EventExecutorGroup) + * @see #eventLoopGroupProvider(EventLoopGroupProvider) + * @see #timer(Timer) + */ + Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider); + /** * Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and * {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java index a8625fe2f0..1e01f631b6 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java +++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java @@ -40,7 +40,6 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -68,6 +67,8 @@ * {@code computationThreadPoolSize}. *

  • a {@code nettyCustomizer} that is a provided instance of {@link NettyCustomizer}.
  • *
  • a {@code socketAddressResolver} which is a provided instance of {@link SocketAddressResolver}.
  • + *
  • a {@code threadFactoryProvider} to provide a {@link java.util.concurrent.ThreadFactory} for default timer, event loop and + * event executor instances.
  • *
  • a {@code timer} that is a provided instance of {@link io.netty.util.HashedWheelTimer}.
  • *
  • a {@code tracing} that is a provided instance of {@link Tracing}.
  • * @@ -150,6 +151,8 @@ public class DefaultClientResources implements ClientResources { private final SocketAddressResolver socketAddressResolver; + private final ThreadFactoryProvider threadFactoryProvider; + private final Timer timer; private final boolean sharedTimer; @@ -161,6 +164,7 @@ public class DefaultClientResources implements ClientResources { protected DefaultClientResources(Builder builder) { addressResolverGroup = builder.addressResolverGroup; + threadFactoryProvider = builder.threadFactoryProvider; if (builder.eventLoopGroupProvider == null) { int ioThreadPoolSize = builder.ioThreadPoolSize; @@ -172,7 +176,7 @@ protected DefaultClientResources(Builder builder) { } this.sharedEventLoopGroupProvider = false; - this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize); + this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize, threadFactoryProvider); } else { this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider; @@ -189,7 +193,7 @@ protected DefaultClientResources(Builder builder) { } eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class, - computationThreadPoolSize); + computationThreadPoolSize, threadFactoryProvider); sharedEventExecutor = false; } else { sharedEventExecutor = builder.sharedEventExecutor; @@ -197,7 +201,7 @@ protected DefaultClientResources(Builder builder) { } if (builder.timer == null) { - timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer")); + timer = new HashedWheelTimer(threadFactoryProvider.getThreadFactory("lettuce-timer")); sharedTimer = false; } else { timer = builder.timer; @@ -315,6 +319,8 @@ public static class Builder implements ClientResources.Builder { private boolean sharedTimer; + private ThreadFactoryProvider threadFactoryProvider = DefaultThreadFactoryProvider.INSTANCE; + private Timer timer; private Tracing tracing = Tracing.disabled(); @@ -565,6 +571,30 @@ public ClientResources.Builder socketAddressResolver(SocketAddressResolver socke return this; } + /** + * Provide a default {@link ThreadFactoryProvider} to obtain {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}. + *

    + * Applies only to threading resources created by {@link DefaultClientResources} when not configuring {@link #timer()}, + * {@link #eventExecutorGroup()}, or {@link #eventLoopGroupProvider()}. + * + * @param threadFactoryProvider a provider to obtain a {@link java.util.concurrent.ThreadFactory} for a + * {@code poolName}, must not be {@code null}. + * @return {@code this} {@link ClientResources.Builder}. + * @since 6.1.1 + * @see #eventExecutorGroup(EventExecutorGroup) + * @see #eventLoopGroupProvider(EventLoopGroupProvider) + * @see #timer(Timer) + */ + @Override + public ClientResources.Builder threadFactoryProvider(ThreadFactoryProvider threadFactoryProvider) { + + LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null"); + + this.threadFactoryProvider = threadFactoryProvider; + return this; + } + /** * Sets a shared {@link Timer} that can be used across different instances of {@link io.lettuce.core.RedisClient} and * {@link io.lettuce.core.cluster.RedisClusterClient} The provided {@link Timer} instance will not be shut down when @@ -634,7 +664,8 @@ public DefaultClientResources.Builder mutate() { builder.commandLatencyRecorder(commandLatencyRecorder()) .commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver()) .eventBus(eventBus()).eventExecutorGroup(eventExecutorGroup()).reconnectDelay(reconnectDelay) - .socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()).timer(timer()) + .socketAddressResolver(socketAddressResolver()).nettyCustomizer(nettyCustomizer()) + .threadFactoryProvider(threadFactoryProvider).timer(timer()) .tracing(tracing()).addressResolverGroup(addressResolverGroup()); builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider; diff --git a/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java b/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java index a5c81c1981..f1a75cc208 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java +++ b/src/main/java/io/lettuce/core/resource/DefaultEventLoopGroupProvider.java @@ -31,7 +31,6 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -59,7 +58,7 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider { private final int numberOfThreads; - private final ThreadFactoryProvider threadFactoryProvider; + private final io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider; private volatile boolean shutdownCalled = false; @@ -88,6 +87,23 @@ public DefaultEventLoopGroupProvider(int numberOfThreads, ThreadFactoryProvider this.threadFactoryProvider = threadFactoryProvider; } + /** + * Creates a new instance of {@link DefaultEventLoopGroupProvider}. + * + * @param numberOfThreads number of threads (pool size) + * @param threadFactoryProvider provides access to {@link io.lettuce.core.resource.ThreadFactoryProvider}. + * @since 6.1.1 + */ + public DefaultEventLoopGroupProvider(int numberOfThreads, + io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) { + + LettuceAssert.isTrue(numberOfThreads > 0, "Number of threads must be greater than zero"); + LettuceAssert.notNull(threadFactoryProvider, "ThreadFactoryProvider must not be null"); + + this.numberOfThreads = numberOfThreads; + this.threadFactoryProvider = threadFactoryProvider; + } + @Override public T allocate(Class type) { @@ -170,7 +186,7 @@ private T getOrCreate(Class type) { * @since 6.0 */ protected EventExecutorGroup doCreateEventLoopGroup(Class type, int numberOfThreads, - ThreadFactoryProvider threadFactoryProvider) { + io.lettuce.core.resource.ThreadFactoryProvider threadFactoryProvider) { return createEventLoopGroup(type, numberOfThreads, threadFactoryProvider); } @@ -209,8 +225,8 @@ public static EventExecutorGroup createEventLoopG * @throws IllegalArgumentException if the {@code type} is not supported. * @since 5.3 */ - private static EventExecutorGroup createEventLoopGroup(Class type, int numberOfThreads, - ThreadFactoryProvider factoryProvider) { + static EventExecutorGroup createEventLoopGroup(Class type, int numberOfThreads, + io.lettuce.core.resource.ThreadFactoryProvider factoryProvider) { logger.debug("Creating executor {}", type.getName()); @@ -322,7 +338,7 @@ public Future shutdown(long quietPeriod, long timeout, TimeUnit timeUni * * @since 6.0 */ - public interface ThreadFactoryProvider { + public interface ThreadFactoryProvider extends io.lettuce.core.resource.ThreadFactoryProvider { /** * Return a {@link ThreadFactory} for the given {@code poolName}. @@ -330,18 +346,8 @@ public interface ThreadFactoryProvider { * @param poolName a descriptive pool name. Typically used as prefix for thread names. * @return the {@link ThreadFactory}. */ - ThreadFactory getThreadFactory(String poolName); - - } - - enum DefaultThreadFactoryProvider implements ThreadFactoryProvider { - - INSTANCE; - @Override - public ThreadFactory getThreadFactory(String poolName) { - return new DefaultThreadFactory(poolName, true); - } + ThreadFactory getThreadFactory(String poolName); } diff --git a/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java b/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java new file mode 100644 index 0000000000..fdb39011f7 --- /dev/null +++ b/src/main/java/io/lettuce/core/resource/DefaultThreadFactoryProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.resource; + +import java.util.concurrent.ThreadFactory; + +import io.netty.util.concurrent.DefaultThreadFactory; + +/** + * Default {@link ThreadFactoryProvider} implementation. + * + * @author Mark Paluch + */ +enum DefaultThreadFactoryProvider implements ThreadFactoryProvider { + + INSTANCE; + + @Override + public ThreadFactory getThreadFactory(String poolName) { + return new DefaultThreadFactory(poolName, true); + } + +} diff --git a/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java b/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java new file mode 100644 index 0000000000..7fc2dd6f9e --- /dev/null +++ b/src/main/java/io/lettuce/core/resource/ThreadFactoryProvider.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.resource; + +import java.util.concurrent.ThreadFactory; + +/** + * Interface to provide a custom {@link java.util.concurrent.ThreadFactory}. Implementations are asked through + * {@link #getThreadFactory(String)} to provide a thread factory for a given pool name. + * + * @since 6.1.1 + */ +@FunctionalInterface +public interface ThreadFactoryProvider { + + /** + * Return a {@link ThreadFactory} for the given {@code poolName}. + * + * @param poolName a descriptive pool name. Typically used as prefix for thread names. + * @return the {@link ThreadFactory}. + */ + ThreadFactory getThreadFactory(String poolName); + +} diff --git a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java index 5da4e55757..373279c4c7 100644 --- a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java +++ b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java @@ -19,7 +19,10 @@ import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import io.lettuce.test.Wait; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; @@ -33,6 +36,7 @@ import io.netty.resolver.AddressResolverGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; @@ -262,4 +266,55 @@ void considersDecoupledSharedStateFromMutation() { copyTimer.stop(); timer.stop(); } + + @Test + void shouldApplyThreadFactory() { + + ClientResources clientResources = ClientResources.builder().threadFactoryProvider(name -> runnable -> { + return new MyThread(runnable, name); + }).ioThreadPoolSize(2).computationThreadPoolSize(2).build(); + + HashedWheelTimer hwt = (HashedWheelTimer) clientResources.timer(); + assertThat(hwt).extracting("workerThread").isInstanceOf(MyThread.class); + + AtomicReference eventExecutorThread = new AtomicReference<>(); + EventExecutor eventExecutor = clientResources.eventExecutorGroup().next(); + eventExecutor.submit(() -> eventExecutorThread.set(Thread.currentThread())).awaitUninterruptibly(); + + AtomicReference eventLoopThread = new AtomicReference<>(); + NioEventLoopGroup eventLoopGroup = clientResources.eventLoopGroupProvider().allocate(NioEventLoopGroup.class); + eventLoopGroup.next().submit(() -> eventLoopThread.set(Thread.currentThread())).awaitUninterruptibly(); + + clientResources.eventLoopGroupProvider().release(eventLoopGroup, 0, 0, TimeUnit.SECONDS); + + clientResources.shutdown(0, 0, TimeUnit.SECONDS); + + assertThat(MyThread.started).hasValue(5); + Wait.untilEquals(5, () -> MyThread.finished).waitOrTimeout(); + } + + static class MyThread extends Thread { + + public static AtomicInteger started = new AtomicInteger(); + + public static AtomicInteger finished = new AtomicInteger(); + + public MyThread(Runnable target, String name) { + super(target, name); + } + + @Override + public synchronized void start() { + started.incrementAndGet(); + super.start(); + } + + @Override + public void run() { + super.run(); + finished.incrementAndGet(); + } + + } + }