Skip to content

Commit

Permalink
fix: epoll socket async chaining and refactor (#374)
Browse files Browse the repository at this point in the history
* fix: epoll socket async chaining and refactor

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
  • Loading branch information
kostasrim authored Feb 4, 2025
1 parent a3b25ee commit b18d739
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 57 deletions.
103 changes: 48 additions & 55 deletions util/fibers/epoll_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void EpollSocket::PendingReq::Activate(error_code ec) {
ActivateSameThread(detail::FiberActive(), context_);
}

bool EpollSocket::AsyncReq::Run(int fd, bool is_send) {
std::pair<bool, EpollSocket::Result<size_t>> EpollSocket::AsyncReq::Run(int fd, bool is_send) {
msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = vec;
Expand All @@ -166,22 +166,18 @@ bool EpollSocket::AsyncReq::Run(int fd, bool is_send) {
res = is_send ? sendmsg(fd, &msg, MSG_NOSIGNAL) : recvmsg(fd, &msg, 0);

if (res > 0) {
cb(res);
return true;
return {true, res};
}

if (res == 0) {
CHECK(!is_send); // can only happen with recvmsg
cb(MakeUnexpected(errc::connection_aborted));
return true;
return {true, MakeUnexpected(errc::connection_aborted)};
}

if (errno == EAGAIN)
return false;
return {false, 0};

error_code ec = from_errno();
cb(make_unexpected(ec));
return true;
return {true, make_unexpected(from_errno())};
}

EpollSocket::EpollSocket(int fd)
Expand Down Expand Up @@ -381,8 +377,11 @@ void EpollSocket::AsyncWriteSome(const iovec* v, uint32_t len, io::AsyncProgress
CHECK(async_write_req_ == nullptr); // we do not allow queuing multiple async requests.

AsyncReq req{const_cast<iovec*>(v), len, std::move(cb)};
if (req.Run(native_handle(), true))
auto [completed, result] = req.Run(native_handle(), true);
if (completed) {
req.cb(result);
return;
}

async_write_req_ = new AsyncReq(std::move(req));
async_write_pending_ = 1;
Expand Down Expand Up @@ -568,6 +567,41 @@ void EpollSocket::CancelOnErrorCb() {
error_cb_ = {};
}

void EpollSocket::HandleAsyncRequest(error_code ec, bool is_send) {
auto async_pending = is_send ? async_write_pending_ : async_read_pending_;
if (async_pending) {
auto& async_request = is_send ? async_write_req_ : async_read_req_;
DCHECK(async_request);

auto finalize_and_fetch_cb = [this, &async_request, is_send]() {
auto cb = std::move(async_request->cb);
delete async_request;
async_request = nullptr;
if (is_send)
async_write_pending_ = 0;
else
async_read_pending_ = 0;
return cb;
};

if (ec) {
auto cb = finalize_and_fetch_cb();
cb(make_unexpected(ec));
} else if (auto res = async_request->Run(native_handle(), is_send); res.first) {
auto cb = finalize_and_fetch_cb();
cb(res.second);
}
} else {
auto& sync_request = is_send ? write_req_ : read_req_;
// It could be that we activated context already, but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (sync_request && sync_request->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule read in " << sync_request->name();
sync_request->Activate(ec);
}
}
}

void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) {
DVSOCK(2) << "Wakey " << ev_mask;
#ifdef __linux__
Expand All @@ -584,54 +618,13 @@ void EpollSocket::Wakey(uint32_t ev_mask, int error, EpollProactor* cntr) {
}

if (ev_mask & (EpollProactor::EPOLL_IN | kErrMask)) {
if (async_read_pending_) {
DCHECK(async_read_req_);

auto finalize = [this] {
delete async_read_req_;
async_read_req_ = nullptr;
async_read_pending_ = 0;
};
if (ec) {
async_read_req_->cb(make_unexpected(ec));
finalize();
} else if (async_read_req_->Run(native_handle(), false)) {
finalize();
}
} else {
// It could be that we activated context already, but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (read_req_ && read_req_->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule read in " << read_req_->name();
read_req_->Activate(ec);
}
}
bool is_send = false;
HandleAsyncRequest(ec, is_send);
}

if (ev_mask & (EpollProactor::EPOLL_OUT | kErrMask)) {
if (async_write_pending_) {
DCHECK(async_write_req_);

auto finalize = [this] {
delete async_write_req_;
async_write_req_ = nullptr;
async_write_pending_ = 0;
};

if (ec) {
async_write_req_->cb(make_unexpected(ec));
finalize();
} else if (async_write_req_->Run(native_handle(), true)) {
finalize();
}
} else {
// It could be that we activated context already but has not switched to it yet.
// Meanwhile a new event has arrived that triggered this callback again.
if (write_req_ && write_req_->IsSuspended()) {
DVSOCK(2) << "Wakey: Schedule write in " << write_req_->name();
write_req_->Activate(ec);
}
}
bool is_send = true;
HandleAsyncRequest(ec, is_send);
}

if (error_cb_ && (ev_mask & kErrMask)) {
Expand Down
8 changes: 6 additions & 2 deletions util/fibers/epoll_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#pragma once

#include <utility>

#include "util/fiber_socket_base.h"
#include "util/fibers/epoll_proactor.h"

Expand Down Expand Up @@ -54,8 +56,8 @@ class EpollSocket : public LinuxSocketBase {
AsyncReq(iovec* v, uint32_t l, io::AsyncProgressCb _cb) : len(l), vec(v), cb(std::move(_cb)) {
}

// Returns true if it has been fullfilled.
bool Run(int fd, bool is_send);
// Caller is responsible for *calling* cb.
std::pair<bool, Result<size_t>> Run(int fd, bool is_send);
};

EpollProactor* GetProactor() {
Expand All @@ -67,6 +69,8 @@ class EpollSocket : public LinuxSocketBase {
// kevent pass error code together with completion event.
void Wakey(uint32_t event_flags, int error, EpollProactor* cntr);

void HandleAsyncRequest(error_code ec, bool is_send);

union {
PendingReq* write_req_;
AsyncReq* async_write_req_;
Expand Down

0 comments on commit b18d739

Please sign in to comment.