Skip to content

Commit

Permalink
[fork] More migration to posix fds
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Jan 16, 2025
1 parent 668e517 commit 023438a
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,7 @@ grpc_cc_library(
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
"posix_event_engine_file_descriptors",
"posix_event_engine_internal_errqueue",
"posix_event_engine_lockfree_event",
"posix_event_engine_wakeup_fd_posix",
Expand Down Expand Up @@ -2302,6 +2303,7 @@ grpc_cc_library(
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
"posix_event_engine_file_descriptors",
"posix_event_engine_wakeup_fd_posix",
"posix_event_engine_wakeup_fd_posix_default",
"status_helper",
Expand Down Expand Up @@ -2402,6 +2404,7 @@ grpc_cc_library(
"posix_event_engine_base_hdrs",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
"posix_event_engine_file_descriptors",
"posix_event_engine_internal_errqueue",
"posix_event_engine_tcp_socket_utils",
"posix_event_engine_traced_buffer_list",
Expand Down Expand Up @@ -2570,6 +2573,7 @@ grpc_cc_library(
"posix_event_engine_closure",
"posix_event_engine_endpoint",
"posix_event_engine_event_poller",
"posix_event_engine_file_descriptors",
"posix_event_engine_listener",
"posix_event_engine_poller_posix_default",
"posix_event_engine_tcp_socket_utils",
Expand Down
51 changes: 25 additions & 26 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/file_descriptors.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/util/crash.h"
Expand Down Expand Up @@ -57,7 +58,7 @@ namespace grpc_event_engine::experimental {

class Epoll1EventHandle : public EventHandle {
public:
Epoll1EventHandle(int fd, Epoll1Poller* poller)
Epoll1EventHandle(const FileDescriptor& fd, Epoll1Poller* poller)
: fd_(fd),
list_(this),
poller_(poller),
Expand All @@ -72,7 +73,7 @@ class Epoll1EventHandle : public EventHandle {
pending_write_.store(false, std::memory_order_relaxed);
pending_error_.store(false, std::memory_order_relaxed);
}
void ReInit(int fd) {
void ReInit(FileDescriptor fd) {
fd_ = fd;
read_closure_->InitEvent();
write_closure_->InitEvent();
Expand Down Expand Up @@ -106,8 +107,8 @@ class Epoll1EventHandle : public EventHandle {

return pending_read || pending_write || pending_error;
}
int WrappedFd() override { return fd_; }
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
FileDescriptor WrappedFd() override { return fd_; }
void OrphanHandle(PosixEngineClosure* on_done, FileDescriptor* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(PosixEngineClosure* on_read) override;
Expand Down Expand Up @@ -142,7 +143,7 @@ class Epoll1EventHandle : public EventHandle {
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
grpc_core::Mutex mu_;
int fd_;
FileDescriptor fd_;
// See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_
// need to be atomic.
std::atomic<bool> pending_read_{false};
Expand Down Expand Up @@ -237,7 +238,7 @@ bool InitEpoll1PollerLinux() {
} // namespace

void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
int* release_fd,
FileDescriptor* release_fd,
absl::string_view reason) {
bool is_release_fd = (release_fd != nullptr);
bool was_shutdown = false;
Expand All @@ -246,22 +247,20 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason),
is_release_fd);
}

auto& fds = poller_->GetFileDescriptors();
// If release_fd is not NULL, we should be relinquishing control of the file
// descriptor fd->fd (but we still own the grpc_fd structure).
if (is_release_fd) {
if (!was_shutdown) {
epoll_event phony_event;
if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
&phony_event) != 0) {
LOG(ERROR) << "OrphanHandle: epoll_ctl failed: "
<< grpc_core::StrError(errno);
auto result = fds.EpollCtlDel(poller_->g_epoll_set_.epfd, fd_);
if (!result.ok()) {
LOG(ERROR) << "OrphanHandle: epoll_ctl failed: " << result.status();
}
}
*release_fd = fd_;
} else {
shutdown(fd_, SHUT_RDWR);
close(fd_);
fds.Shutdown(fd_, SHUT_RDWR);
fds.Close(fd_);
}

{
Expand Down Expand Up @@ -294,11 +293,11 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
GRPC_STATUS_UNAVAILABLE);
if (read_closure_->SetShutdown(why)) {
if (releasing_fd) {
epoll_event phony_event;
if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
&phony_event) != 0) {
auto result = poller_->GetFileDescriptors().EpollCtlDel(
poller_->g_epoll_set_.epfd, fd_);
if (!result.ok()) {
LOG(ERROR) << "HandleShutdownInternal: epoll_ctl failed: "
<< grpc_core::StrError(errno);
<< result.status();
}
}
write_closure_->SetShutdown(why);
Expand Down Expand Up @@ -353,25 +352,25 @@ EventHandle* Epoll1Poller::CreateHandle(FileDescriptor fd,
{
grpc_core::MutexLock lock(&mu_);
if (free_epoll1_handles_list_.empty()) {
new_handle = new Epoll1EventHandle(fd.fd(), this);
new_handle = new Epoll1EventHandle(fd, this);
} else {
new_handle = reinterpret_cast<Epoll1EventHandle*>(
free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
new_handle->ReInit(fd.fd());
new_handle->ReInit(fd);
}
}
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
// Use the least significant bit of ev.data.ptr to store track_err. We expect
// the addresses to be word aligned. We need to store track_err to avoid
// synchronization issues when accessing it after receiving an event.
// Accessing fd would be a data race there because the fd might have been
// returned to the free list at that point.
ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_handle) |
(track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd.fd(), &ev) != 0) {
LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
auto result = GetFileDescriptors().EpollCtlAdd(
g_epoll_set_.epfd, fd,
reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_handle) |
(track_err ? 1 : 0)));
if (!result.ok()) {
LOG(ERROR) << "epoll_ctl failed: " << result.status();
}

return new_handle;
Expand Down
20 changes: 11 additions & 9 deletions src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "absl/strings/str_format.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/file_descriptors.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/util/crash.h"
Expand Down Expand Up @@ -68,7 +69,7 @@ using Events = absl::InlinedVector<PollEventHandle*, 5>;

class PollEventHandle : public EventHandle {
public:
PollEventHandle(int fd, std::shared_ptr<PollPoller> poller)
PollEventHandle(FileDescriptor fd, std::shared_ptr<PollPoller> poller)
: fd_(fd),
pending_actions_(0),
fork_fd_list_(this),
Expand Down Expand Up @@ -108,14 +109,14 @@ class PollEventHandle : public EventHandle {
grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListRemoveHandle(this);
}
int WrappedFd() override { return fd_; }
FileDescriptor WrappedFd() override { return fd_; }
bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return is_orphaned_;
}
void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (!released_ && !closed_) {
closed_ = true;
close(fd_);
poller_->GetFileDescriptors().Close(fd_);
}
}
bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; }
Expand All @@ -132,7 +133,7 @@ class PollEventHandle : public EventHandle {
void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
watch_mask_ = watch_mask;
}
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
void OrphanHandle(PosixEngineClosure* on_done, FileDescriptor* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(PosixEngineClosure* on_read) override;
Expand Down Expand Up @@ -199,7 +200,7 @@ class PollEventHandle : public EventHandle {
// required.
grpc_core::Mutex mu_;
std::atomic<int> ref_count_{1};
int fd_;
FileDescriptor fd_;
int pending_actions_;
PollPoller::HandlesList fork_fd_list_;
PollPoller::HandlesList poller_handles_list_;
Expand Down Expand Up @@ -296,14 +297,15 @@ EventHandle* PollPoller::CreateHandle(FileDescriptor fd,
// Avoid unused-parameter warning for debug-only parameter
(void)track_err;
DCHECK(track_err == false);
PollEventHandle* handle = new PollEventHandle(fd.fd(), shared_from_this());
PollEventHandle* handle = new PollEventHandle(fd, shared_from_this());
// We need to send a kick to the thread executing Work(..) so that it can
// add this new Fd into the list of Fds to poll.
KickExternal(false);
return handle;
}

void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done,
FileDescriptor* release_fd,
absl::string_view /*reason*/) {
ForceRemoveHandleFromPoller();
{
Expand All @@ -328,7 +330,7 @@ void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
}
// signal read/write closed to OS so that future operations fail.
if (!released_) {
shutdown(fd_, SHUT_RDWR);
poller_->GetFileDescriptors().Shutdown(fd_, SHUT_RDWR);
}
if (!IsWatched()) {
CloseFd();
Expand Down Expand Up @@ -641,7 +643,7 @@ Poller::WorkResult PollPoller::Work(
// poll handle list for the poller under the poller lock.
CHECK(!head->IsOrphaned());
if (!head->IsPollhup()) {
pfds[pfd_count].fd = head->WrappedFd();
pfds[pfd_count].fd = head->WrappedFd().fd();
watchers[pfd_count] = head;
// BeginPollLocked takes a ref of the handle. It also marks the
// fd as Watched with an appropriate watch_mask. The watch_mask
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/event_engine/posix_engine/event_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ class PosixEventPoller;

class EventHandle {
public:
virtual int WrappedFd() = 0;
virtual FileDescriptor WrappedFd() = 0;
// Delete the handle and optionally close the underlying file descriptor if
// release_fd != nullptr. The on_done closure is scheduled to be invoked
// after the operation is complete. After this operation, NotifyXXX and SetXXX
// operations cannot be performed on the handle. In general, this method
// should only be called after ShutdownHandle and after all existing NotifyXXX
// closures have run and there is no waiting NotifyXXX closure.
virtual void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
virtual void OrphanHandle(PosixEngineClosure* on_done,
FileDescriptor* release_fd,
absl::string_view reason) = 0;
// Shutdown a handle. If there is an attempt to call NotifyXXX operations
// after Shutdown handle, those closures will be run immediately with the
Expand Down
77 changes: 64 additions & 13 deletions src/core/lib/event_engine/posix_engine/file_descriptors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@
#include <unistd.h>
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON

#ifdef GRPC_POSIX_SOCKET
#ifdef GRPC_LINUX_EPOLL
#include <sys/epoll.h>
#else
#include "src/core/util/crash.h"
#endif

#ifdef GRPC_POSIX_SOCKET
#define IF_POSIX_SOCKET(signature, body) signature body

#else // GRPC_POSIX_SOCKET

#include "src/core/util/crash.h"

#define IF_POSIX_SOCKET(signature, body) \
signature { grpc_core::Crash("unimplemented"); }

#endif // GRPC_POSIX_SOCKET

namespace grpc_event_engine::experimental {
Expand All @@ -66,15 +67,17 @@ IF_POSIX_SOCKET(void FileDescriptors::Close(const FileDescriptor& fd),
//
// Factories
//
IF_POSIX_SOCKET(FileDescriptorResult FileDescriptors::Accept(
int sockfd, struct sockaddr* addr, socklen_t* addrlen),
{ return RegisterPosixResult(accept(sockfd, addr, addrlen)); })
IF_POSIX_SOCKET(
FileDescriptorResult FileDescriptors::Accept(const FileDescriptor& sockfd,
struct sockaddr* addr,
socklen_t* addrlen),
{ return RegisterPosixResult(accept(sockfd.fd(), addr, addrlen)); })

#ifdef GRPC_POSIX_SOCKETUTILS

IF_POSIX_SOCKET(
FileDescriptorResult FileDescriptors::Accept4(
int sockfd,
const FileDescriptor& sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec),
{
Expand Down Expand Up @@ -112,7 +115,7 @@ IF_POSIX_SOCKET(

IF_POSIX_SOCKET(
FileDescriptorResult FileDescriptors::Accept4(
int sockfd,
const FileDescriptor& sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec),
{
Expand All @@ -121,12 +124,60 @@ IF_POSIX_SOCKET(
flags |= cloexec ? SOCK_CLOEXEC : 0;
EventEngine::ResolvedAddress peer_addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
FileDescriptorResult ret = RegisterPosixResult(accept4(
sockfd, const_cast<sockaddr*>(peer_addr.address()), &len, flags));
addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
FileDescriptorResult ret = RegisterPosixResult(
accept4(sockfd.fd(), const_cast<sockaddr*>(peer_addr.address()), &len,
flags));
if (ret.ok()) {
addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
}
return ret;
})

#endif // GRPC_POSIX_SOCKETUTILS

IF_POSIX_SOCKET(PosixResult FileDescriptors::Ioctl(const FileDescriptor& fd,
int op, void* arg),
{ return PosixResult::Wrap(ioctl(fd.fd(), op, arg)); });

IF_POSIX_SOCKET(PosixResult FileDescriptors::Shutdown(const FileDescriptor& fd,
int how),
{ return PosixResult::Wrap(shutdown(fd.fd(), how)); })

IF_POSIX_SOCKET(PosixResult FileDescriptors::GetSockOpt(
const FileDescriptor& fd, int level, int optname,
void* optval, uint32_t* optlen),
{
return PosixResult::Wrap(
getsockopt(fd.fd(), level, optname, optval, optlen));
})

//
// Epoll
//
IF_POSIX_SOCKET(PosixResult FileDescriptors::EpollCtlDel(
int epfd, const FileDescriptor& fd),
{
#ifdef GRPC_LINUX_EPOLL
epoll_event phony_event;
return PosixResult::Wrap(
epoll_ctl(epfd, EPOLL_CTL_DEL, fd.fd(), &phony_event));
#else // GRPC_LINUX_EPOLL
grpc_core::Crash("Epoll not supported");
#endif // GRPC_LINUX_EPOLL
})

IF_POSIX_SOCKET(
PosixResult FileDescriptors::EpollCtlAdd(int epfd, const FileDescriptor& fd,
void* data),
{
#ifdef GRPC_LINUX_EPOLL
epoll_event event;
event.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
event.data.ptr = data;
return PosixResult::Wrap(epoll_ctl(epfd, EPOLL_CTL_ADD, fd.fd(), &event));
#else // GRPC_LINUX_EPOLL
grpc_core::Crash("Epoll not supported");
#endif // GRPC_LINUX_EPOLL
})

} // namespace grpc_event_engine::experimental
Loading

0 comments on commit 023438a

Please sign in to comment.