diff --git a/jdk/src/linux/classes/com/alibaba/wisp/engine/WispConfiguration.java b/jdk/src/linux/classes/com/alibaba/wisp/engine/WispConfiguration.java index d47f598dfed..6e7c7d4618a 100644 --- a/jdk/src/linux/classes/com/alibaba/wisp/engine/WispConfiguration.java +++ b/jdk/src/linux/classes/com/alibaba/wisp/engine/WispConfiguration.java @@ -60,6 +60,7 @@ class WispConfiguration { static final int WISP_SCHEDULE_HELP_STEAL_RETRY; static final WispScheduler.SchedulingPolicy SCHEDULING_POLICY; static final boolean USE_DIRECT_SELECTOR_WAKEUP; + static final boolean SEPARATE_IO_POLLER; static final boolean CARRIER_AS_POLLER; static final boolean MONOLITHIC_POLL; static final boolean CARRIER_GROW; @@ -113,7 +114,9 @@ public Properties run() { WISP_PROFILE_LOG_PATH = ""; } - CARRIER_AS_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.useCarrierAsPoller", ALL_THREAD_AS_WISP); + SEPARATE_IO_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.separateIOPoller", false); + CARRIER_AS_POLLER = parseBooleanParameter(p, "com.alibaba.wisp.useCarrierAsPoller", + ALL_THREAD_AS_WISP && (!SEPARATE_IO_POLLER || WORKER_COUNT / POLLER_SHARDING_SIZE > 1)); MONOLITHIC_POLL = parseBooleanParameter(p, "com.alibaba.wisp.monolithicPoll", true); WISP_HIGH_PRECISION_TIMER = parseBooleanParameter(p, "com.alibaba.wisp.highPrecisionTimer", false); WISP_ENGINE_TASK_CACHE_SIZE = parsePositiveIntegerParameter(p, "com.alibaba.wisp.engineTaskCache", 20); diff --git a/jdk/src/linux/classes/com/alibaba/wisp/engine/WispEventPump.java b/jdk/src/linux/classes/com/alibaba/wisp/engine/WispEventPump.java index a84e2e74ee1..c8d613ed44a 100644 --- a/jdk/src/linux/classes/com/alibaba/wisp/engine/WispEventPump.java +++ b/jdk/src/linux/classes/com/alibaba/wisp/engine/WispEventPump.java @@ -76,6 +76,9 @@ enum Pool { Pool() { int n = Math.max(1, WispConfiguration.WORKER_COUNT / WispConfiguration.POLLER_SHARDING_SIZE); n = (n & (n - 1)) == 0 ? n : Integer.highestOneBit(n) * 2; // next power of 2 + if (WispConfiguration.SEPARATE_IO_POLLER) { + n = Math.max(2, n); + } mask = n - 1; pumps = new WispEventPump[n]; for (int i = 0; i < pumps.length; i++) { @@ -104,16 +107,21 @@ private static int hash(int x) { return x * (int) 2654435761L; } + private WispEventPump pumpFromFd(int fd, boolean isRead) { + int offset = WispConfiguration.SEPARATE_IO_POLLER && !isRead ? 0 : 1; + return pumps[(hash(fd) + offset) & mask]; + } + void registerEvent(WispTask task, SelectableChannel ch, int event) throws IOException { if (ch != null && ch.isOpen()) { int fd = ((SelChImpl) ch).getFDVal(); - pumps[hash(fd) & mask].registerEvent(task, fd, event); + pumpFromFd(fd, isNativeReadEvent(toNativeEvent(event))).registerEvent(task, fd, event); } } int epollWaitForWisp(int epfd, long pollArray, int arraySize, long timeout, AtomicReference status, final Object INTERRUPTED) throws IOException { - return pumps[hash(epfd) & mask].epollWaitForWisp(epfd, pollArray, arraySize, timeout, status, INTERRUPTED); + return pumpFromFd(epfd, true).epollWaitForWisp(epfd, pollArray, arraySize, timeout, status, INTERRUPTED); } void interruptEpoll(AtomicReference status, Object INTERRUPTED, int interruptFd) { @@ -140,18 +148,18 @@ WispEventPump getPump(int ord) { /** * whether event is a reading event or an accepting event */ - private boolean isReadEvent(int events) throws IllegalArgumentException { + private static boolean isNativeReadEvent(int events) throws IllegalArgumentException { int event = (events & (Net.POLLCONN | Net.POLLIN | Net.POLLOUT)); assert Integer.bitCount(event) == 1; return (events & Net.POLLIN) != 0; } private WispTask[] getFd2TaskLow(int events) { - return isReadEvent(events) ? fd2ReadTaskLow : fd2WriteTaskLow; + return isNativeReadEvent(events) ? fd2ReadTaskLow : fd2WriteTaskLow; } private ConcurrentHashMap getFd2TaskHigh(int events) { - return isReadEvent(events) ? fd2ReadTaskHigh : fd2WriteTaskHigh; + return isNativeReadEvent(events) ? fd2ReadTaskHigh : fd2WriteTaskHigh; } private boolean sanityCheck(int fd, WispTask newTask, int events) { @@ -187,12 +195,17 @@ private WispTask removeTaskByFD(int fd, int events) { return task; } - private void registerEvent(WispTask task, int fd, int event) throws IOException { - int ev = 0; + private static int toNativeEvent(int events) { // Translates an interest operation set into a native poll event set - if ((event & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) ev |= Net.POLLIN; - if ((event & SelectionKey.OP_WRITE) != 0) ev |= Net.POLLOUT; - if ((event & SelectionKey.OP_CONNECT) != 0) ev |= Net.POLLCONN; + int ev = 0; + if ((events & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) ev |= Net.POLLIN; + if ((events & SelectionKey.OP_WRITE) != 0) ev |= Net.POLLOUT; + if ((events & SelectionKey.OP_CONNECT) != 0) ev |= Net.POLLCONN; + return ev; + } + + private void registerEvent(WispTask task, int fd, int event) throws IOException { + int ev = toNativeEvent(event); // When the socket is closed, the poll event will be triggered ev |= Net.POLLHUP; // specify the EPOLLONESHOT flag, to tell epoll to disable the associated diff --git a/jdk/test/com/alibaba/wisp2/bug/TestIssue311.java b/jdk/test/com/alibaba/wisp2/bug/TestIssue311.java new file mode 100644 index 00000000000..26853dccfc2 --- /dev/null +++ b/jdk/test/com/alibaba/wisp2/bug/TestIssue311.java @@ -0,0 +1,52 @@ +/* + * @test + * @library /lib/testlibrary + * @summary Test different coroutines waiting on the same socket's read and write events. + * @requires os.family == "linux" + * @run main/timeout=10/othervm -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -Dcom.alibaba.wisp.separateIOPoller=true TestIssue311 + */ + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +public class TestIssue311 { + public static void main(String[] args) throws Exception { + Socket client = startEchoServer(); + new Thread(() -> { + try { + byte[] buf = new byte[1024]; + while (true) { // discard all data + client.getInputStream().read(buf, 0, buf.length); + } + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + + byte[] buf = new byte[1024 * 1024]; + for (int i = 0; i < 100; i++) { + client.getOutputStream().write(buf, 0, buf.length); + } + } + + + private static Socket startEchoServer() throws Exception { + ServerSocket serverSocket = new ServerSocket(0); + Socket socket = new Socket("localhost", serverSocket.getLocalPort()); + new Thread(() -> { + try { + Socket clientSocket = serverSocket.accept(); + byte[] buf = new byte[1024]; + int len; + while ((len = clientSocket.getInputStream().read(buf)) > 0) { + clientSocket.getOutputStream().write(buf, 0, len); + } + clientSocket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + return socket; + } +} \ No newline at end of file