Skip to content

Commit

Permalink
[Wisp] Separate IO poller
Browse files Browse the repository at this point in the history
Summary: Supports different coroutines waiting on the same socket's read and write events. Solve a wisp bug triggered by okhttp.

Test Plan: TestIssue311

Reviewed-by: zhengxiaolinX, D-D-H

Issue: dragonwell-project#311
  • Loading branch information
jia-wei-tang committed Mar 31, 2023
1 parent 9dca7b6 commit 494a743
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class WispConfiguration {
static final int WISP_SCHEDULE_HELP_STEAL_RETRY;
static final WispScheduler.SchedulingPolicy SCHEDULING_POLICY;
static final boolean USE_DIRECT_SELECTOR_WAKEUP;
static final boolean SEPARATE_IO_POLLER;
static final boolean CARRIER_AS_POLLER;
static final boolean MONOLITHIC_POLL;
static final boolean CARRIER_GROW;
Expand Down Expand Up @@ -113,7 +114,9 @@ public Properties run() {
WISP_PROFILE_LOG_PATH = "";
}

CARRIER_AS_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.useCarrierAsPoller", ALL_THREAD_AS_WISP);
SEPARATE_IO_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.separateIOPoller", false);
CARRIER_AS_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.useCarrierAsPoller",
ALL_THREAD_AS_WISP && (!SEPARATE_IO_POLLER || WORKER_COUNT / POLLER_SHARDING_SIZE > 1));
MONOLITHIC_POLL = parseBooleanParameter(p, "com.alibaba.wisp.monolithicPoll", true);
WISP_HIGH_PRECISION_TIMER = parseBooleanParameter(p, "com.alibaba.wisp.highPrecisionTimer", false);
WISP_ENGINE_TASK_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskCache", 20);
Expand Down
33 changes: 23 additions & 10 deletions jdk/src/linux/classes/com/alibaba/wisp/engine/WispEventPump.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ enum Pool {
Pool() {
int n = Math.max(1, WispConfiguration.WORKER_COUNT / WispConfiguration.POLLER_SHARDING_SIZE);
n = (n & (n - 1)) == 0 ? n : Integer.highestOneBit(n) * 2; // next power of 2
if (WispConfiguration.SEPARATE_IO_POLLER) {
n = Math.max(2, n);
}
mask = n - 1;
pumps = new WispEventPump[n];
for (int i = 0; i < pumps.length; i++) {
Expand Down Expand Up @@ -104,16 +107,21 @@ private static int hash(int x) {
return x * (int) 2654435761L;
}

private WispEventPump pumpFromFd(int fd, boolean isRead) {
int offset = WispConfiguration.SEPARATE_IO_POLLER && !isRead ? 0 : 1;
return pumps[(hash(fd) + offset) & mask];
}

void registerEvent(WispTask task, SelectableChannel ch, int event) throws IOException {
if (ch != null && ch.isOpen()) {
int fd = ((SelChImpl) ch).getFDVal();
pumps[hash(fd) & mask].registerEvent(task, fd, event);
pumpFromFd(fd, isNativeReadEvent(toNativeEvent(event))).registerEvent(task, fd, event);
}
}

int epollWaitForWisp(int epfd, long pollArray, int arraySize, long timeout, AtomicReference<Object> status,
final Object INTERRUPTED) throws IOException {
return pumps[hash(epfd) & mask].epollWaitForWisp(epfd, pollArray, arraySize, timeout, status, INTERRUPTED);
return pumpFromFd(epfd, true).epollWaitForWisp(epfd, pollArray, arraySize, timeout, status, INTERRUPTED);
}

void interruptEpoll(AtomicReference<Object> status, Object INTERRUPTED, int interruptFd) {
Expand All @@ -140,18 +148,18 @@ WispEventPump getPump(int ord) {
/**
* whether event is a reading event or an accepting event
*/
private boolean isReadEvent(int events) throws IllegalArgumentException {
private static boolean isNativeReadEvent(int events) throws IllegalArgumentException {
int event = (events & (Net.POLLCONN | Net.POLLIN | Net.POLLOUT));
assert Integer.bitCount(event) == 1;
return (events & Net.POLLIN) != 0;
}

private WispTask[] getFd2TaskLow(int events) {
return isReadEvent(events) ? fd2ReadTaskLow : fd2WriteTaskLow;
return isNativeReadEvent(events) ? fd2ReadTaskLow : fd2WriteTaskLow;
}

private ConcurrentHashMap<Integer, WispTask> getFd2TaskHigh(int events) {
return isReadEvent(events) ? fd2ReadTaskHigh : fd2WriteTaskHigh;
return isNativeReadEvent(events) ? fd2ReadTaskHigh : fd2WriteTaskHigh;
}

private boolean sanityCheck(int fd, WispTask newTask, int events) {
Expand Down Expand Up @@ -187,12 +195,17 @@ private WispTask removeTaskByFD(int fd, int events) {
return task;
}

private void registerEvent(WispTask task, int fd, int event) throws IOException {
int ev = 0;
private static int toNativeEvent(int events) {
// Translates an interest operation set into a native poll event set
if ((event & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) ev |= Net.POLLIN;
if ((event & SelectionKey.OP_WRITE) != 0) ev |= Net.POLLOUT;
if ((event & SelectionKey.OP_CONNECT) != 0) ev |= Net.POLLCONN;
int ev = 0;
if ((events & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) ev |= Net.POLLIN;
if ((events & SelectionKey.OP_WRITE) != 0) ev |= Net.POLLOUT;
if ((events & SelectionKey.OP_CONNECT) != 0) ev |= Net.POLLCONN;
return ev;
}

private void registerEvent(WispTask task, int fd, int event) throws IOException {
int ev = toNativeEvent(event);
// When the socket is closed, the poll event will be triggered
ev |= Net.POLLHUP;
// specify the EPOLLONESHOT flag, to tell epoll to disable the associated
Expand Down
52 changes: 52 additions & 0 deletions jdk/test/com/alibaba/wisp2/bug/TestIssue311.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* @test
* @library /lib/testlibrary
* @summary Test different coroutines waiting on the same socket's read and write events.
* @requires os.family == "linux"
* @run main/timeout=10/othervm -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -Dcom.alibaba.wisp.separateIOPoller=true TestIssue311
*/

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class TestIssue311 {
public static void main(String[] args) throws Exception {
Socket client = startEchoServer();
new Thread(() -> {
try {
byte[] buf = new byte[1024];
while (true) { // discard all data
client.getInputStream().read(buf, 0, buf.length);
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();

byte[] buf = new byte[1024 * 1024];
for (int i = 0; i < 100; i++) {
client.getOutputStream().write(buf, 0, buf.length);
}
}


private static Socket startEchoServer() throws Exception {
ServerSocket serverSocket = new ServerSocket(0);
Socket socket = new Socket("localhost", serverSocket.getLocalPort());
new Thread(() -> {
try {
Socket clientSocket = serverSocket.accept();
byte[] buf = new byte[1024];
int len;
while ((len = clientSocket.getInputStream().read(buf)) > 0) {
clientSocket.getOutputStream().write(buf, 0, len);
}
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
return socket;
}
}

0 comments on commit 494a743

Please sign in to comment.