Skip to content

Commit

Permalink
Use io_uring native pollable fd instead of an event fd
Browse files Browse the repository at this point in the history
Summary:
Use io_uring native pollable fd instead of an event fd
This should avoid an unnecessary system call to read the eventfd value

Reviewed By: spikeh

Differential Revision: D54550325

fbshipit-source-id: 8c1d566b82d9345222a90ba83fea197766848155
  • Loading branch information
Dan Melnic authored and facebook-github-bot committed Mar 9, 2024
1 parent 66a7788 commit 213881d
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 38 deletions.
33 changes: 5 additions & 28 deletions folly/experimental/io/AsyncBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
#include <folly/portability/Filesystem.h>
#include <folly/portability/Unistd.h>

#if __has_include(<sys/eventfd.h>)
#include <sys/eventfd.h>
#endif

namespace folly {

AsyncBaseOp::AsyncBaseOp(NotificationCallback cb)
Expand Down Expand Up @@ -90,24 +86,15 @@ std::string AsyncBaseOp::fd2name(int fd) {
return fs::read_symlink(link).string();
}

AsyncBase::AsyncBase(size_t capacity, PollMode pollMode) : capacity_(capacity) {
AsyncBase::AsyncBase(size_t capacity, PollMode pollMode)
: capacity_(capacity), pollMode_(pollMode) {
CHECK_GT(capacity_, 0);
completed_.reserve(capacity_);
if (pollMode == POLLABLE) {
#if __has_include(<sys/eventfd.h>)
pollFd_ = eventfd(0, EFD_NONBLOCK);
checkUnixError(pollFd_, "AsyncBase: eventfd creation failed");
#else
// fallthrough to not-pollable, observed as: pollFd() == -1
#endif
}
}

AsyncBase::~AsyncBase() {
CHECK_EQ(pending_, 0);
if (pollFd_ != -1) {
CHECK_ERR(close(pollFd_));
}
CHECK_EQ(pollFd_, -1);
}

void AsyncBase::decrementPending(size_t n) {
Expand Down Expand Up @@ -190,20 +177,10 @@ Range<AsyncBase::Op**> AsyncBase::cancel() {
Range<AsyncBase::Op**> AsyncBase::pollCompleted() {
CHECK(isInit());
CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
uint64_t numEvents;
// This sets the eventFd counter to 0, see
// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
ssize_t rc;
do {
rc = ::read(pollFd_, &numEvents, 8);
} while (rc == -1 && errno == EINTR);
if (FOLLY_UNLIKELY(rc == -1 && errno == EAGAIN)) {

if (drainPollFd() <= 0) {
return Range<Op**>(); // nothing completed
}
checkUnixError(rc, "AsyncBase: read from event fd failed");
DCHECK_EQ(rc, 8);

DCHECK_GT(numEvents, 0);

// Don't reap more than pending_, as we've just reset the counter to 0.
return doWait(WaitType::COMPLETE, 0, pending_.load(), completed_);
Expand Down
2 changes: 2 additions & 0 deletions folly/experimental/io/AsyncBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class AsyncBase {
int submit(Range<Op**> ops);

protected:
virtual int drainPollFd() = 0;
void complete(Op* op, ssize_t result) { op->complete(result); }

void cancel(Op* op) { op->cancel(); }
Expand All @@ -268,6 +269,7 @@ class AsyncBase {
std::atomic<size_t> pending_{0};
std::atomic<size_t> submitted_{0};
const size_t capacity_;
const PollMode pollMode_;
int pollFd_{-1};
std::vector<Op*> completed_;
std::vector<Op*> canceled_;
Expand Down
42 changes: 41 additions & 1 deletion folly/experimental/io/AsyncIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include <folly/portability/Unistd.h>
#include <folly/small_vector.h>

#if __has_include(<sys/eventfd.h>)
#include <sys/eventfd.h>
#endif

#if __has_include(<libaio.h>)

// debugging helpers
Expand Down Expand Up @@ -146,14 +150,32 @@ std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
}

AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: AsyncBase(capacity, pollMode) {}
: AsyncBase(capacity, pollMode) {
// we need to create the eventfd in the constructor
// since we have code that relies on registering the pollFd_
// before any operation is started

if (pollMode_ == POLLABLE) {
#if __has_include(<sys/eventfd.h>)
pollFd_ = eventfd(0, EFD_NONBLOCK);
checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
#else
// fallthrough to not-pollable, observed as: pollFd() == -1
#endif
}
}

AsyncIO::~AsyncIO() {
CHECK_EQ(pending_, 0);
if (ctx_) {
int rc = io_queue_release(ctx_);
CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
}

if (pollFd_ != -1) {
CHECK_ERR(close(pollFd_));
pollFd_ = -1;
}
}

void AsyncIO::initializeContext() {
Expand All @@ -180,11 +202,29 @@ void AsyncIO::initializeContext() {

checkKernelError(rc, "AsyncIO: io_queue_init failed");
DCHECK(ctx_);

init_.store(true, std::memory_order_release);
}
}
}

int AsyncIO::drainPollFd() {
uint64_t numEvents;
// This sets the eventFd counter to 0, see
// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
ssize_t rc;
do {
rc = ::read(pollFd_, &numEvents, 8);
} while (rc == -1 && errno == EINTR);
if (FOLLY_UNLIKELY(rc == -1 && errno == EAGAIN)) {
return 0;
}
checkUnixError(rc, "AsyncIO: read from event fd failed");
DCHECK_EQ(rc, 8);
DCHECK_GT(numEvents, 0);
return static_cast<int>(numEvents);
}

int AsyncIO::submitOne(AsyncBase::Op* op) {
// -1 return here will trigger throw if op isn't an AsyncIOOp
AsyncIOOp* aop = op->getAsyncIOOp();
Expand Down
1 change: 1 addition & 0 deletions folly/experimental/io/AsyncIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class AsyncIO : public AsyncBase {
void initializeContext() override;

protected:
int drainPollFd() override;
int submitOne(AsyncBase::Op* op) override;
int submitRange(Range<AsyncBase::Op**> ops) override;

Expand Down
23 changes: 14 additions & 9 deletions folly/experimental/io/IoUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ IoUring::IoUring(

params_.flags |= IORING_SETUP_CQSIZE;
params_.cq_entries = roundUpToNextPowerOfTwo(capacity_);

// we need to call initializeContext() in the constructor
// since we have code that relies on registering the pollFd_
// before any operation is started
initializeContext();
}

IoUring::~IoUring() {
Expand All @@ -219,13 +224,13 @@ IoUring::~IoUring() {
::io_uring_queue_exit(&ioRing_);
ioRing_.ring_fd = -1;
}

pollFd_ = -1;
}

bool IoUring::isAvailable() {
IoUring ioUring(1);

try {
ioUring.initializeContext();
IoUring ioUring(1);
} catch (...) {
return false;
}
Expand All @@ -235,16 +240,12 @@ bool IoUring::isAvailable() {

int IoUring::register_buffers(
const struct iovec* iovecs, unsigned int nr_iovecs) {
initializeContext();

std::unique_lock lk(submitMutex_);

return io_uring_register_buffers(&ioRing_, iovecs, nr_iovecs);
}

int IoUring::unregister_buffers() {
initializeContext();

std::unique_lock lk(submitMutex_);
return io_uring_unregister_buffers(&ioRing_);
}
Expand All @@ -257,14 +258,18 @@ void IoUring::initializeContext() {
roundUpToNextPowerOfTwo(maxSubmit_), &ioRing_, &params_);
checkKernelError(rc, "IoUring: io_uring_queue_init_params failed");
DCHECK_GT(ioRing_.ring_fd, 0);
if (pollFd_ != -1) {
CHECK_ERR(io_uring_register_eventfd(&ioRing_, pollFd_));
if (pollMode_ == POLLABLE) {
pollFd_ = ioRing_.ring_fd;
}
init_.store(true, std::memory_order_release);
}
}
}

int IoUring::drainPollFd() {
return static_cast<int>(::io_uring_cq_ready(&ioRing_));
}

int IoUring::submitOne(AsyncBase::Op* op) {
// -1 return here will trigger throw if op isn't an IoUringOp
IoUringOp* iop = op->getIoUringOp();
Expand Down
1 change: 1 addition & 0 deletions folly/experimental/io/IoUring.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class IoUring : public AsyncBase {
void initializeContext() override;

protected:
int drainPollFd() override;
int submitOne(AsyncBase::Op* op) override;
int submitRange(Range<AsyncBase::Op**> ops) override;

Expand Down

0 comments on commit 213881d

Please sign in to comment.