From b18d7396336020a20c234b9dfdbf86421dfa6208 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 4 Feb 2025 21:41:50 +0200 Subject: [PATCH] fix: epoll socket async chaining and refactor (#374) * fix: epoll socket async chaining and refactor --------- Signed-off-by: kostas --- util/fibers/epoll_socket.cc | 103 +++++++++++++++++------------------- util/fibers/epoll_socket.h | 8 ++- 2 files changed, 54 insertions(+), 57 deletions(-) diff --git a/util/fibers/epoll_socket.cc b/util/fibers/epoll_socket.cc index e9536fc1..db3f91bb 100644 --- a/util/fibers/epoll_socket.cc +++ b/util/fibers/epoll_socket.cc @@ -156,7 +156,7 @@ void EpollSocket::PendingReq::Activate(error_code ec) { ActivateSameThread(detail::FiberActive(), context_); } -bool EpollSocket::AsyncReq::Run(int fd, bool is_send) { +std::pair> EpollSocket::AsyncReq::Run(int fd, bool is_send) { msghdr msg; memset(&msg, 0, sizeof(msg)); msg.msg_iov = vec; @@ -166,22 +166,18 @@ bool EpollSocket::AsyncReq::Run(int fd, bool is_send) { res = is_send ? sendmsg(fd, &msg, MSG_NOSIGNAL) : recvmsg(fd, &msg, 0); if (res > 0) { - cb(res); - return true; + return {true, res}; } if (res == 0) { CHECK(!is_send); // can only happen with recvmsg - cb(MakeUnexpected(errc::connection_aborted)); - return true; + return {true, MakeUnexpected(errc::connection_aborted)}; } if (errno == EAGAIN) - return false; + return {false, 0}; - error_code ec = from_errno(); - cb(make_unexpected(ec)); - return true; + return {true, make_unexpected(from_errno())}; } EpollSocket::EpollSocket(int fd) @@ -381,8 +377,11 @@ void EpollSocket::AsyncWriteSome(const iovec* v, uint32_t len, io::AsyncProgress CHECK(async_write_req_ == nullptr); // we do not allow queuing multiple async requests. AsyncReq req{const_cast(v), len, std::move(cb)}; - if (req.Run(native_handle(), true)) + auto [completed, result] = req.Run(native_handle(), true); + if (completed) { + req.cb(result); return; + } async_write_req_ = new AsyncReq(std::move(req)); async_write_pending_ = 1; @@ -568,6 +567,41 @@ void EpollSocket::CancelOnErrorCb() { error_cb_ = {}; } +void EpollSocket::HandleAsyncRequest(error_code ec, bool is_send) { + auto async_pending = is_send ? async_write_pending_ : async_read_pending_; + if (async_pending) { + auto& async_request = is_send ? async_write_req_ : async_read_req_; + DCHECK(async_request); + + auto finalize_and_fetch_cb = [this, &async_request, is_send]() { + auto cb = std::move(async_request->cb); + delete async_request; + async_request = nullptr; + if (is_send) + async_write_pending_ = 0; + else + async_read_pending_ = 0; + return cb; + }; + + if (ec) { + auto cb = finalize_and_fetch_cb(); + cb(make_unexpected(ec)); + } else if (auto res = async_request->Run(native_handle(), is_send); res.first) { + auto cb = finalize_and_fetch_cb(); + cb(res.second); + } + } else { + auto& sync_request = is_send ? write_req_ : read_req_; + // It could be that we activated context already, but has not switched to it yet. + // Meanwhile a new event has arrived that triggered this callback again. + if (sync_request && sync_request->IsSuspended()) { + DVSOCK(2) << "Wakey: Schedule read in " << sync_request->name(); + sync_request->Activate(ec); + } + } +} + void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) { DVSOCK(2) << "Wakey " << ev_mask; #ifdef __linux__ @@ -584,54 +618,13 @@ void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) { } if (ev_mask & (EpollProactor::EPOLL_IN | kErrMask)) { - if (async_read_pending_) { - DCHECK(async_read_req_); - - auto finalize = [this] { - delete async_read_req_; - async_read_req_ = nullptr; - async_read_pending_ = 0; - }; - if (ec) { - async_read_req_->cb(make_unexpected(ec)); - finalize(); - } else if (async_read_req_->Run(native_handle(), false)) { - finalize(); - } - } else { - // It could be that we activated context already, but has not switched to it yet. - // Meanwhile a new event has arrived that triggered this callback again. - if (read_req_ && read_req_->IsSuspended()) { - DVSOCK(2) << "Wakey: Schedule read in " << read_req_->name(); - read_req_->Activate(ec); - } - } + bool is_send = false; + HandleAsyncRequest(ec, is_send); } if (ev_mask & (EpollProactor::EPOLL_OUT | kErrMask)) { - if (async_write_pending_) { - DCHECK(async_write_req_); - - auto finalize = [this] { - delete async_write_req_; - async_write_req_ = nullptr; - async_write_pending_ = 0; - }; - - if (ec) { - async_write_req_->cb(make_unexpected(ec)); - finalize(); - } else if (async_write_req_->Run(native_handle(), true)) { - finalize(); - } - } else { - // It could be that we activated context already but has not switched to it yet. - // Meanwhile a new event has arrived that triggered this callback again. - if (write_req_ && write_req_->IsSuspended()) { - DVSOCK(2) << "Wakey: Schedule write in " << write_req_->name(); - write_req_->Activate(ec); - } - } + bool is_send = true; + HandleAsyncRequest(ec, is_send); } if (error_cb_ && (ev_mask & kErrMask)) { diff --git a/util/fibers/epoll_socket.h b/util/fibers/epoll_socket.h index d5c3cbff..9fa21049 100644 --- a/util/fibers/epoll_socket.h +++ b/util/fibers/epoll_socket.h @@ -4,6 +4,8 @@ #pragma once +#include + #include "util/fiber_socket_base.h" #include "util/fibers/epoll_proactor.h" @@ -54,8 +56,8 @@ class EpollSocket : public LinuxSocketBase { AsyncReq(iovec* v, uint32_t l, io::AsyncProgressCb _cb) : len(l), vec(v), cb(std::move(_cb)) { } - // Returns true if it has been fullfilled. - bool Run(int fd, bool is_send); + // Caller is responsible for *calling* cb. + std::pair> Run(int fd, bool is_send); }; EpollProactor* GetProactor() { @@ -67,6 +69,8 @@ class EpollSocket : public LinuxSocketBase { // kevent pass error code together with completion event. void Wakey(uint32_t event_flags, int error, EpollProactor* cntr); + void HandleAsyncRequest(error_code ec, bool is_send); + union { PendingReq* write_req_; AsyncReq* async_write_req_;