Skip to content

Commit

Permalink
Add support for io_uring #1522
Browse files Browse the repository at this point in the history
Lettuce now integrates with netty's experimental io_uring transport when netty-incubator-transport-native-io_uring is on the classpath.
  • Loading branch information
mp911de committed Nov 30, 2020
1 parent df6ccd7 commit 52c705b
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 26 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.1.Final</version>
<classifier>linux-x86_64</classifier>
<optional>true</optional>
</dependency>

<!-- Metrics/Tracing -->

<dependency>
Expand Down
1 change: 1 addition & 0 deletions src/main/asciidoc/new-features.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Kotlin Coroutines support for `SCAN`/`HSCAN`/`SSCAN`/`ZSCAN` through `ScanFlow`.
* Command Listener API through `RedisClient.addListener(CommandListener)`.
* <<command.latency.metrics.micrometer,Micrometer support>> through `MicrometerCommandLatencyRecorder`.
* <<native.transport,Experimental support for `io_uring`>>.

[[new-features.6-0-0]]
== What's new in Lettuce 6.0
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint
connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));

if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
NativeTransports.assertDomainSocketAvailable();
connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
} else {
connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
Expand Down Expand Up @@ -321,7 +321,7 @@ private EventLoopGroup doGetEventExecutor(ConnectionPoint connectionPoint) {

if (connectionPoint.getSocket() != null) {

NativeTransports.assertAvailable();
NativeTransports.assertDomainSocketAvailable();

Class<? extends EventLoopGroup> eventLoopGroupClass = NativeTransports.eventLoopGroupClass();

Expand All @@ -336,7 +336,7 @@ private EventLoopGroup doGetEventExecutor(ConnectionPoint connectionPoint) {
}

if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
NativeTransports.assertDomainSocketAvailable();
return eventLoopGroups.get(NativeTransports.eventLoopGroupClass());
}

Expand Down
27 changes: 18 additions & 9 deletions src/main/java/io/lettuce/core/Transports.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.EpollProvider;
import io.lettuce.core.resource.EventLoopResources;
import io.lettuce.core.resource.IOUringProvider;
import io.lettuce.core.resource.KqueueProvider;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* Transport infrastructure utility class. This class provides {@link EventLoopGroup} and {@link Channel} classes for socket and
* native socket transports.
* Transport infrastructure utility class. This class provides {@link EventLoopGroup} and {@link Channel} classes for TCP socket
* and domain socket transports.
*
* @author Mark Paluch
* @since 4.4
Expand All @@ -38,7 +39,7 @@ class Transports {
*/
static Class<? extends EventLoopGroup> eventLoopGroupClass() {

if (NativeTransports.isSocketSupported()) {
if (NativeTransports.isAvailable()) {
return NativeTransports.eventLoopGroupClass();
}

Expand All @@ -50,7 +51,7 @@ static Class<? extends EventLoopGroup> eventLoopGroupClass() {
*/
static Class<? extends Channel> socketChannelClass() {

if (NativeTransports.isSocketSupported()) {
if (NativeTransports.isAvailable()) {
return NativeTransports.socketChannelClass();
}

Expand All @@ -63,12 +64,19 @@ static Class<? extends Channel> socketChannelClass() {
static class NativeTransports {

static EventLoopResources RESOURCES = KqueueProvider.isAvailable() ? KqueueProvider.getResources()
: EpollProvider.getResources();
: IOUringProvider.isAvailable() ? IOUringProvider.getResources() : EpollProvider.getResources();

/**
* @return {@code true} if a native transport is available.
*/
static boolean isSocketSupported() {
static boolean isAvailable() {
return EpollProvider.isAvailable() || KqueueProvider.isAvailable() || IOUringProvider.isAvailable();
}

/**
* @return {@code true} if a native transport for domain sockets is available.
*/
static boolean isDomainSocketSupported() {
return EpollProvider.isAvailable() || KqueueProvider.isAvailable();
}

Expand All @@ -83,6 +91,7 @@ static Class<? extends Channel> socketChannelClass() {
* @return the native transport domain socket {@link Channel} class.
*/
static Class<? extends Channel> domainSocketChannelClass() {
assertDomainSocketAvailable();
return RESOURCES.domainSocketChannelClass();
}

Expand All @@ -93,10 +102,10 @@ static Class<? extends EventLoopGroup> eventLoopGroupClass() {
return RESOURCES.eventLoopGroupClass();
}

static void assertAvailable() {
static void assertDomainSocketAvailable() {

LettuceAssert.assertState(NativeTransports.isSocketSupported(),
"A unix domain socket connections requires epoll or kqueue and neither is available");
LettuceAssert.assertState(NativeTransports.isDomainSocketSupported(),
"A unix domain socket connection requires epoll or kqueue and neither is available");
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.lettuce.core.resource;

import static io.lettuce.core.resource.PromiseAdapter.toBooleanPromise;
import static io.lettuce.core.resource.PromiseAdapter.*;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -29,7 +29,15 @@
import io.lettuce.core.internal.LettuceAssert;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.concurrent.SucceededFuture;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand Down Expand Up @@ -234,6 +242,16 @@ private static <T extends EventExecutorGroup> EventExecutorGroup createEventLoop
}
}

if (IOUringProvider.isAvailable()) {

EventLoopResources resources = IOUringProvider.getResources();

if (resources.matches(type)) {
return resources.newEventLoopGroup(numberOfThreads,
factoryProvider.getThreadFactory("lettuce-io_uringEventLoop"));
}
}

throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/lettuce/core/resource/EventLoopResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public interface EventLoopResources {
*/
boolean matches(Class<? extends EventExecutorGroup> type);

/**
* @return the {@link EventLoopGroup} class.
*/
Class<? extends EventLoopGroup> eventLoopGroupClass();

/**
* Create a new {@link EpollEventLoopGroup}.
*
Expand All @@ -48,20 +53,15 @@ public interface EventLoopResources {
*/
EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory);

/**
* @return the Domain Socket {@link Channel} class.
*/
Class<? extends Channel> domainSocketChannelClass();

/**
* @return the {@link Channel} class.
*/
Class<? extends Channel> socketChannelClass();

/**
* @return the {@link EventLoopGroup} class.
* @return the Domain Socket {@link Channel} class.
*/
Class<? extends EventLoopGroup> eventLoopGroupClass();
Class<? extends Channel> domainSocketChannelClass();

/**
* @param socketPath the socket file path.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.resource;

import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;

/**
* Wrapper for {@link EventLoopResources} that applies a {@link Runnable verification} before calling the delegate method.
*
* @author Mark Paluch
*/
class EventLoopResourcesWrapper implements EventLoopResources {

private final EventLoopResources delegate;

private final Runnable verifier;

EventLoopResourcesWrapper(EventLoopResources delegate, Runnable verifier) {
this.delegate = delegate;
this.verifier = verifier;
}

@Override
public boolean matches(Class<? extends EventExecutorGroup> type) {
verifier.run();
return delegate.matches(type);
}

@Override
public Class<? extends EventLoopGroup> eventLoopGroupClass() {
verifier.run();
return delegate.eventLoopGroupClass();
}

@Override
public EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
verifier.run();
return delegate.newEventLoopGroup(nThreads, threadFactory);
}

@Override
public Class<? extends Channel> socketChannelClass() {
verifier.run();
return delegate.socketChannelClass();
}

@Override
public Class<? extends Channel> domainSocketChannelClass() {
verifier.run();
return delegate.domainSocketChannelClass();
}

@Override
public SocketAddress newSocketAddress(String socketPath) {
verifier.run();
return delegate.newSocketAddress(socketPath);
}

}
Loading

0 comments on commit 52c705b

Please sign in to comment.