From 023438aa00405fa992553ea12cf9ccaa28c1340f Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Thu, 16 Jan 2025 14:31:42 -0800 Subject: [PATCH] [fork] More migration to posix fds --- src/core/BUILD | 4 + .../posix_engine/ev_epoll1_linux.cc | 51 ++++++------ .../posix_engine/ev_poll_posix.cc | 20 +++-- .../event_engine/posix_engine/event_poller.h | 5 +- .../posix_engine/file_descriptors.cc | 77 ++++++++++++++--- .../posix_engine/file_descriptors.h | 82 +++++++++++++++++-- .../posix_engine/grpc_polled_fd_posix.h | 8 +- .../posix_engine/posix_endpoint.cc | 33 ++++---- .../posix_engine/posix_endpoint.h | 7 +- .../event_engine/posix_engine/posix_engine.cc | 15 ++-- .../posix_engine/posix_engine_listener.cc | 2 +- .../posix/event_poller_posix_test.cc | 6 +- 12 files changed, 219 insertions(+), 91 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 002cb05aaa992..7d1fe7c8094a8 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -2264,6 +2264,7 @@ grpc_cc_library( "iomgr_port", "posix_event_engine_closure", "posix_event_engine_event_poller", + "posix_event_engine_file_descriptors", "posix_event_engine_internal_errqueue", "posix_event_engine_lockfree_event", "posix_event_engine_wakeup_fd_posix", @@ -2302,6 +2303,7 @@ grpc_cc_library( "iomgr_port", "posix_event_engine_closure", "posix_event_engine_event_poller", + "posix_event_engine_file_descriptors", "posix_event_engine_wakeup_fd_posix", "posix_event_engine_wakeup_fd_posix_default", "status_helper", @@ -2402,6 +2404,7 @@ grpc_cc_library( "posix_event_engine_base_hdrs", "posix_event_engine_closure", "posix_event_engine_event_poller", + "posix_event_engine_file_descriptors", "posix_event_engine_internal_errqueue", "posix_event_engine_tcp_socket_utils", "posix_event_engine_traced_buffer_list", @@ -2570,6 +2573,7 @@ grpc_cc_library( "posix_event_engine_closure", "posix_event_engine_endpoint", "posix_event_engine_event_poller", + "posix_event_engine_file_descriptors", "posix_event_engine_listener", "posix_event_engine_poller_posix_default", "posix_event_engine_tcp_socket_utils", diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index 53c908f4c5f65..bd921ece40e87 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -28,6 +28,7 @@ #include "absl/status/statusor.h" #include "absl/strings/str_format.h" #include "src/core/lib/event_engine/poller.h" +#include "src/core/lib/event_engine/posix_engine/file_descriptors.h" #include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/iomgr/port.h" #include "src/core/util/crash.h" @@ -57,7 +58,7 @@ namespace grpc_event_engine::experimental { class Epoll1EventHandle : public EventHandle { public: - Epoll1EventHandle(int fd, Epoll1Poller* poller) + Epoll1EventHandle(const FileDescriptor& fd, Epoll1Poller* poller) : fd_(fd), list_(this), poller_(poller), @@ -72,7 +73,7 @@ class Epoll1EventHandle : public EventHandle { pending_write_.store(false, std::memory_order_relaxed); pending_error_.store(false, std::memory_order_relaxed); } - void ReInit(int fd) { + void ReInit(FileDescriptor fd) { fd_ = fd; read_closure_->InitEvent(); write_closure_->InitEvent(); @@ -106,8 +107,8 @@ class Epoll1EventHandle : public EventHandle { return pending_read || pending_write || pending_error; } - int WrappedFd() override { return fd_; } - void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, + FileDescriptor WrappedFd() override { return fd_; } + void OrphanHandle(PosixEngineClosure* on_done, FileDescriptor* release_fd, absl::string_view reason) override; void ShutdownHandle(absl::Status why) override; void NotifyOnRead(PosixEngineClosure* on_read) override; @@ -142,7 +143,7 @@ class Epoll1EventHandle : public EventHandle { // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is // required. grpc_core::Mutex mu_; - int fd_; + FileDescriptor fd_; // See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_ // need to be atomic. std::atomic pending_read_{false}; @@ -237,7 +238,7 @@ bool InitEpoll1PollerLinux() { } // namespace void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done, - int* release_fd, + FileDescriptor* release_fd, absl::string_view reason) { bool is_release_fd = (release_fd != nullptr); bool was_shutdown = false; @@ -246,22 +247,20 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done, HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason), is_release_fd); } - + auto& fds = poller_->GetFileDescriptors(); // If release_fd is not NULL, we should be relinquishing control of the file // descriptor fd->fd (but we still own the grpc_fd structure). if (is_release_fd) { if (!was_shutdown) { - epoll_event phony_event; - if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_, - &phony_event) != 0) { - LOG(ERROR) << "OrphanHandle: epoll_ctl failed: " - << grpc_core::StrError(errno); + auto result = fds.EpollCtlDel(poller_->g_epoll_set_.epfd, fd_); + if (!result.ok()) { + LOG(ERROR) << "OrphanHandle: epoll_ctl failed: " << result.status(); } } *release_fd = fd_; } else { - shutdown(fd_, SHUT_RDWR); - close(fd_); + fds.Shutdown(fd_, SHUT_RDWR); + fds.Close(fd_); } { @@ -294,11 +293,11 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why, GRPC_STATUS_UNAVAILABLE); if (read_closure_->SetShutdown(why)) { if (releasing_fd) { - epoll_event phony_event; - if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_, - &phony_event) != 0) { + auto result = poller_->GetFileDescriptors().EpollCtlDel( + poller_->g_epoll_set_.epfd, fd_); + if (!result.ok()) { LOG(ERROR) << "HandleShutdownInternal: epoll_ctl failed: " - << grpc_core::StrError(errno); + << result.status(); } } write_closure_->SetShutdown(why); @@ -353,25 +352,25 @@ EventHandle* Epoll1Poller::CreateHandle(FileDescriptor fd, { grpc_core::MutexLock lock(&mu_); if (free_epoll1_handles_list_.empty()) { - new_handle = new Epoll1EventHandle(fd.fd(), this); + new_handle = new Epoll1EventHandle(fd, this); } else { new_handle = reinterpret_cast( free_epoll1_handles_list_.front()); free_epoll1_handles_list_.pop_front(); - new_handle->ReInit(fd.fd()); + new_handle->ReInit(fd); } } - struct epoll_event ev; - ev.events = static_cast(EPOLLIN | EPOLLOUT | EPOLLET); // Use the least significant bit of ev.data.ptr to store track_err. We expect // the addresses to be word aligned. We need to store track_err to avoid // synchronization issues when accessing it after receiving an event. // Accessing fd would be a data race there because the fd might have been // returned to the free list at that point. - ev.data.ptr = reinterpret_cast(reinterpret_cast(new_handle) | - (track_err ? 1 : 0)); - if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd.fd(), &ev) != 0) { - LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno); + auto result = GetFileDescriptors().EpollCtlAdd( + g_epoll_set_.epfd, fd, + reinterpret_cast(reinterpret_cast(new_handle) | + (track_err ? 1 : 0))); + if (!result.ok()) { + LOG(ERROR) << "epoll_ctl failed: " << result.status(); } return new_handle; diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index 41c2bdc3a29bb..776b4dedda3e3 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -34,6 +34,7 @@ #include "absl/strings/str_format.h" #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" +#include "src/core/lib/event_engine/posix_engine/file_descriptors.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/iomgr/port.h" #include "src/core/util/crash.h" @@ -68,7 +69,7 @@ using Events = absl::InlinedVector; class PollEventHandle : public EventHandle { public: - PollEventHandle(int fd, std::shared_ptr poller) + PollEventHandle(FileDescriptor fd, std::shared_ptr poller) : fd_(fd), pending_actions_(0), fork_fd_list_(this), @@ -108,14 +109,14 @@ class PollEventHandle : public EventHandle { grpc_core::MutexLock lock(&poller_->mu_); poller_->PollerHandlesListRemoveHandle(this); } - int WrappedFd() override { return fd_; } + FileDescriptor WrappedFd() override { return fd_; } bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return is_orphaned_; } void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { if (!released_ && !closed_) { closed_ = true; - close(fd_); + poller_->GetFileDescriptors().Close(fd_); } } bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; } @@ -132,7 +133,7 @@ class PollEventHandle : public EventHandle { void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { watch_mask_ = watch_mask; } - void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, + void OrphanHandle(PosixEngineClosure* on_done, FileDescriptor* release_fd, absl::string_view reason) override; void ShutdownHandle(absl::Status why) override; void NotifyOnRead(PosixEngineClosure* on_read) override; @@ -199,7 +200,7 @@ class PollEventHandle : public EventHandle { // required. grpc_core::Mutex mu_; std::atomic ref_count_{1}; - int fd_; + FileDescriptor fd_; int pending_actions_; PollPoller::HandlesList fork_fd_list_; PollPoller::HandlesList poller_handles_list_; @@ -296,14 +297,15 @@ EventHandle* PollPoller::CreateHandle(FileDescriptor fd, // Avoid unused-parameter warning for debug-only parameter (void)track_err; DCHECK(track_err == false); - PollEventHandle* handle = new PollEventHandle(fd.fd(), shared_from_this()); + PollEventHandle* handle = new PollEventHandle(fd, shared_from_this()); // We need to send a kick to the thread executing Work(..) so that it can // add this new Fd into the list of Fds to poll. KickExternal(false); return handle; } -void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, +void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, + FileDescriptor* release_fd, absl::string_view /*reason*/) { ForceRemoveHandleFromPoller(); { @@ -328,7 +330,7 @@ void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, } // signal read/write closed to OS so that future operations fail. if (!released_) { - shutdown(fd_, SHUT_RDWR); + poller_->GetFileDescriptors().Shutdown(fd_, SHUT_RDWR); } if (!IsWatched()) { CloseFd(); @@ -641,7 +643,7 @@ Poller::WorkResult PollPoller::Work( // poll handle list for the poller under the poller lock. CHECK(!head->IsOrphaned()); if (!head->IsPollhup()) { - pfds[pfd_count].fd = head->WrappedFd(); + pfds[pfd_count].fd = head->WrappedFd().fd(); watchers[pfd_count] = head; // BeginPollLocked takes a ref of the handle. It also marks the // fd as Watched with an appropriate watch_mask. The watch_mask diff --git a/src/core/lib/event_engine/posix_engine/event_poller.h b/src/core/lib/event_engine/posix_engine/event_poller.h index f54d8501e568f..9e5285c313969 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller.h +++ b/src/core/lib/event_engine/posix_engine/event_poller.h @@ -40,14 +40,15 @@ class PosixEventPoller; class EventHandle { public: - virtual int WrappedFd() = 0; + virtual FileDescriptor WrappedFd() = 0; // Delete the handle and optionally close the underlying file descriptor if // release_fd != nullptr. The on_done closure is scheduled to be invoked // after the operation is complete. After this operation, NotifyXXX and SetXXX // operations cannot be performed on the handle. In general, this method // should only be called after ShutdownHandle and after all existing NotifyXXX // closures have run and there is no waiting NotifyXXX closure. - virtual void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, + virtual void OrphanHandle(PosixEngineClosure* on_done, + FileDescriptor* release_fd, absl::string_view reason) = 0; // Shutdown a handle. If there is an attempt to call NotifyXXX operations // after Shutdown handle, those closures will be run immediately with the diff --git a/src/core/lib/event_engine/posix_engine/file_descriptors.cc b/src/core/lib/event_engine/posix_engine/file_descriptors.cc index a82edfb0a84e0..5bb6176662f2a 100644 --- a/src/core/lib/event_engine/posix_engine/file_descriptors.cc +++ b/src/core/lib/event_engine/posix_engine/file_descriptors.cc @@ -35,17 +35,18 @@ #include #endif // GRPC_POSIX_SOCKET_UTILS_COMMON -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_LINUX_EPOLL +#include +#else +#include "src/core/util/crash.h" +#endif +#ifdef GRPC_POSIX_SOCKET #define IF_POSIX_SOCKET(signature, body) signature body - #else // GRPC_POSIX_SOCKET - #include "src/core/util/crash.h" - #define IF_POSIX_SOCKET(signature, body) \ signature { grpc_core::Crash("unimplemented"); } - #endif // GRPC_POSIX_SOCKET namespace grpc_event_engine::experimental { @@ -66,15 +67,17 @@ IF_POSIX_SOCKET(void FileDescriptors::Close(const FileDescriptor& fd), // // Factories // -IF_POSIX_SOCKET(FileDescriptorResult FileDescriptors::Accept( - int sockfd, struct sockaddr* addr, socklen_t* addrlen), - { return RegisterPosixResult(accept(sockfd, addr, addrlen)); }) +IF_POSIX_SOCKET( + FileDescriptorResult FileDescriptors::Accept(const FileDescriptor& sockfd, + struct sockaddr* addr, + socklen_t* addrlen), + { return RegisterPosixResult(accept(sockfd.fd(), addr, addrlen)); }) #ifdef GRPC_POSIX_SOCKETUTILS IF_POSIX_SOCKET( FileDescriptorResult FileDescriptors::Accept4( - int sockfd, + const FileDescriptor& sockfd, grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, int nonblock, int cloexec), { @@ -112,7 +115,7 @@ IF_POSIX_SOCKET( IF_POSIX_SOCKET( FileDescriptorResult FileDescriptors::Accept4( - int sockfd, + const FileDescriptor& sockfd, grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, int nonblock, int cloexec), { @@ -121,12 +124,60 @@ IF_POSIX_SOCKET( flags |= cloexec ? SOCK_CLOEXEC : 0; EventEngine::ResolvedAddress peer_addr; socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; - FileDescriptorResult ret = RegisterPosixResult(accept4( - sockfd, const_cast(peer_addr.address()), &len, flags)); - addr = EventEngine::ResolvedAddress(peer_addr.address(), len); + FileDescriptorResult ret = RegisterPosixResult( + accept4(sockfd.fd(), const_cast(peer_addr.address()), &len, + flags)); + if (ret.ok()) { + addr = EventEngine::ResolvedAddress(peer_addr.address(), len); + } return ret; }) #endif // GRPC_POSIX_SOCKETUTILS +IF_POSIX_SOCKET(PosixResult FileDescriptors::Ioctl(const FileDescriptor& fd, + int op, void* arg), + { return PosixResult::Wrap(ioctl(fd.fd(), op, arg)); }); + +IF_POSIX_SOCKET(PosixResult FileDescriptors::Shutdown(const FileDescriptor& fd, + int how), + { return PosixResult::Wrap(shutdown(fd.fd(), how)); }) + +IF_POSIX_SOCKET(PosixResult FileDescriptors::GetSockOpt( + const FileDescriptor& fd, int level, int optname, + void* optval, uint32_t* optlen), + { + return PosixResult::Wrap( + getsockopt(fd.fd(), level, optname, optval, optlen)); + }) + +// +// Epoll +// +IF_POSIX_SOCKET(PosixResult FileDescriptors::EpollCtlDel( + int epfd, const FileDescriptor& fd), + { +#ifdef GRPC_LINUX_EPOLL + epoll_event phony_event; + return PosixResult::Wrap( + epoll_ctl(epfd, EPOLL_CTL_DEL, fd.fd(), &phony_event)); +#else // GRPC_LINUX_EPOLL + grpc_core::Crash("Epoll not supported"); +#endif // GRPC_LINUX_EPOLL + }) + +IF_POSIX_SOCKET( + PosixResult FileDescriptors::EpollCtlAdd(int epfd, const FileDescriptor& fd, + void* data), + { +#ifdef GRPC_LINUX_EPOLL + epoll_event event; + event.events = static_cast(EPOLLIN | EPOLLOUT | EPOLLET); + event.data.ptr = data; + return PosixResult::Wrap(epoll_ctl(epfd, EPOLL_CTL_ADD, fd.fd(), &event)); +#else // GRPC_LINUX_EPOLL + grpc_core::Crash("Epoll not supported"); +#endif // GRPC_LINUX_EPOLL + }) + } // namespace grpc_event_engine::experimental \ No newline at end of file diff --git a/src/core/lib/event_engine/posix_engine/file_descriptors.h b/src/core/lib/event_engine/posix_engine/file_descriptors.h index 9a6cc639a6961..5c55f325697db 100644 --- a/src/core/lib/event_engine/posix_engine/file_descriptors.h +++ b/src/core/lib/event_engine/posix_engine/file_descriptors.h @@ -18,6 +18,7 @@ #include #include +#include #include "absl/log/check.h" #include "absl/status/status.h" @@ -32,6 +33,11 @@ class FileDescriptor { int fd() const { return fd_; } bool ready() const { return fd_ > 0; } + template + friend void AbslStringify(Sink& sink, FileDescriptor fd) { + sink.Append(absl::StrFormat("FD(%d)", fd.fd())); + } + private: int fd_ = 0; }; @@ -47,12 +53,60 @@ enum class OperationResultKind { template void AbslStringify(Sink& sink, OperationResultKind kind) { - absl::Format(sink, "%s", - kind == OperationResultKind::kSuccess ? "(Success)" - : kind == OperationResultKind::kError ? "(Success)" - : "(Success)"); + sink.Append(kind == OperationResultKind::kSuccess ? "(Success)" + : kind == OperationResultKind::kError ? "(Success)" + : "(Success)"); } +// Result of the factory call. kWrongGeneration may happen in the call to +// Accept* +class PosixResult { + public: + static constexpr PosixResult Success() { + return PosixResult(OperationResultKind::kSuccess); + } + + static PosixResult Error() { + return PosixResult(OperationResultKind::kError, errno); + } + + static PosixResult Wrap(int result) { + return result == 0 ? Success() : Error(); + } + + PosixResult() = default; + + absl::Status status() const { + switch (kind_) { + case OperationResultKind::kSuccess: + return absl::OkStatus(); + case OperationResultKind::kError: + return absl::ErrnoToStatus(errno_value_, ""); + case OperationResultKind::kWrongGeneration: + return absl::InternalError( + "File descriptor is from the wrong generation"); + } + } + + bool ok() const { return kind_ == OperationResultKind::kSuccess; } + + bool IsPosixError(int err) const { + return kind_ == OperationResultKind::kError && errno_value_ == err; + } + + OperationResultKind kind() const { return kind_; } + int errno_value() const { return errno_value_; } + + private: + explicit constexpr PosixResult(OperationResultKind kind, int errno_value = 0) + : kind_(kind), errno_value_(errno_value) {} + + OperationResultKind kind_ = OperationResultKind::kSuccess; + // errno value on call completion, in order to reduce the race conditions + // from relying on global variable. + int errno_value_ = 0; +}; + // Result of the factory call. kWrongGeneration may happen in the call to // Accept* class FileDescriptorResult { @@ -116,15 +170,25 @@ class FileDescriptorResult { class FileDescriptors { public: - FileDescriptorResult Accept(int sockfd, struct sockaddr* addr, - socklen_t* addrlen); - FileDescriptorResult Accept4(int sockfd, EventEngine::ResolvedAddress& addr, - int nonblock, int cloexec); - + FileDescriptorResult Accept(const FileDescriptor& sockfd, + struct sockaddr* addr, socklen_t* addrlen); + FileDescriptorResult Accept4(const FileDescriptor& sockfd, + EventEngine::ResolvedAddress& addr, int nonblock, + int cloexec); FileDescriptor Adopt(int fd); void Close(const FileDescriptor& fd); + // Posix + PosixResult Ioctl(const FileDescriptor& fd, int op, void* arg); + PosixResult Shutdown(const FileDescriptor& fd, int how); + PosixResult GetSockOpt(const FileDescriptor& fd, int level, int optname, + void* optval, uint32_t* optlen); + + // Epoll + PosixResult EpollCtlAdd(int epfd, const FileDescriptor& fd, void* data); + PosixResult EpollCtlDel(int epfd, const FileDescriptor& fd); + private: FileDescriptorResult RegisterPosixResult(int result); }; diff --git a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h index bbcbd2dc8bd3c..da2a02b9af037 100644 --- a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h +++ b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h @@ -20,6 +20,7 @@ #include +#include "file_descriptors.h" #include "src/core/lib/iomgr/port.h" #include "src/core/util/sync.h" @@ -57,7 +58,7 @@ class GrpcPolledFdPosix : public GrpcPolledFd { ~GrpcPolledFdPosix() override { // c-ares library will close the fd. This fd may be picked up immediately by // another thread and should not be closed by the following OrphanHandle. - int phony_release_fd; + FileDescriptor phony_release_fd; handle_->OrphanHandle(/*on_done=*/nullptr, &phony_release_fd, "c-ares query finished"); } @@ -76,7 +77,10 @@ class GrpcPolledFdPosix : public GrpcPolledFd { bool IsFdStillReadableLocked() override { size_t bytes_available = 0; - return ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) == 0 && + return handle_->Poller() + ->GetFileDescriptors() + .Ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) + .ok() && bytes_available > 0; } diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index 75ab2799ee003..f640192390460 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -37,6 +37,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" +#include "file_descriptors.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" @@ -98,13 +99,13 @@ namespace { // A wrapper around sendmsg. It sends \a msg over \a fd and returns the number // of bytes sent. -ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno, - int additional_flags = 0) { +ssize_t TcpSend(const FileDescriptor& fd, const struct msghdr* msg, + int* saved_errno, int additional_flags = 0) { GRPC_LATENT_SEE_PARENT_SCOPE("TcpSend"); ssize_t sent_length; do { grpc_core::global_stats().IncrementSyscallWrite(); - sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); + sent_length = sendmsg(fd.fd(), msg, SENDMSG_FLAGS | additional_flags); } while (sent_length < 0 && (*saved_errno = errno) == EINTR); return sent_length; } @@ -278,7 +279,7 @@ void PosixEndpointImpl::FinishEstimate() { absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) const { grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kFd, - handle_->WrappedFd()); + handle_->WrappedFd().fd()); grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); return src_error; @@ -334,7 +335,7 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { incoming_buffer_->Count()); do { grpc_core::global_stats().IncrementSyscallRead(); - read_bytes = recvmsg(fd_, &msg, 0); + read_bytes = recvmsg(fd_.fd(), &msg, 0); } while (read_bytes < 0 && errno == EINTR); if (read_bytes < 0 && errno == EAGAIN) { @@ -711,7 +712,7 @@ bool PosixEndpointImpl::ProcessErrors() { while (true) { msg.msg_controllen = sizeof(aligned_buf.rbuf); do { - r = recvmsg(fd_, &msg, MSG_ERRQUEUE); + r = recvmsg(fd_.fd(), &msg, MSG_ERRQUEUE); saved_errno = errno; } while (r < 0 && saved_errno == EINTR); @@ -849,8 +850,8 @@ bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg, int additional_flags) { if (!socket_ts_enabled_) { uint32_t opt = kTimestampingSocketOptions; - if (setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, static_cast(&opt), - sizeof(opt)) != 0) { + if (setsockopt(fd_.fd(), SOL_SOCKET, SO_TIMESTAMPING, + static_cast(&opt), sizeof(opt)) != 0) { return false; } bytes_counter_ = -1; @@ -876,7 +877,7 @@ bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg, // Only save timestamps if all the bytes were taken by sendmsg. if (sending_length == static_cast(length)) { traced_buffers_.AddNewEntry(static_cast(bytes_counter_ + length), - fd_, outgoing_buffer_arg_); + fd_.fd(), outgoing_buffer_arg_); outgoing_buffer_arg_ = nullptr; } return true; @@ -1250,12 +1251,12 @@ void PosixEndpointImpl::MaybeShutdown( } PosixEndpointImpl ::~PosixEndpointImpl() { - int release_fd = -1; + FileDescriptor release_fd; handle_->OrphanHandle(on_done_, on_release_fd_ == nullptr ? nullptr : &release_fd, ""); if (on_release_fd_ != nullptr) { engine_->Run([on_release_fd = std::move(on_release_fd_), - release_fd]() mutable { on_release_fd(release_fd); }); + release_fd]() mutable { on_release_fd(release_fd.fd()); }); } delete on_read_; delete on_write_; @@ -1267,13 +1268,13 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, std::shared_ptr engine, MemoryAllocator&& /*allocator*/, const PosixTcpOptions& options) - : sock_(PosixSocketWrapper(handle->WrappedFd())), + : sock_(PosixSocketWrapper(handle->WrappedFd().fd())), on_done_(on_done), traced_buffers_(), handle_(handle), poller_(handle->Poller()), engine_(engine) { - PosixSocketWrapper sock(handle->WrappedFd()); + PosixSocketWrapper sock(handle->WrappedFd().fd()); fd_ = handle_->WrappedFd(); CHECK(options.resource_quota != nullptr); auto peer_addr_string = sock.PeerAddressString(); @@ -1308,8 +1309,8 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, << "value."; } else { const int enable = 1; - if (setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)) != - 0) { + if (setsockopt(fd_.fd(), SOL_SOCKET, SO_ZEROCOPY, &enable, + sizeof(enable)) != 0) { zerocopy_enabled = false; LOG(ERROR) << "Failed to set zerocopy options on the socket."; } @@ -1327,7 +1328,7 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, options.tcp_tx_zerocopy_send_bytes_threshold); #ifdef GRPC_HAVE_TCP_INQ int one = 1; - if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { + if (setsockopt(fd_.fd(), SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { inq_capable_ = true; } else { VLOG(2) << "cannot set inq fd=" << fd_ << " errno=" << errno; diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index 3ddd318af7542..57a99ef3247ea 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -41,6 +41,7 @@ #include "src/core/lib/event_engine/extensions/supports_fd.h" #include "src/core/lib/event_engine/posix.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" +#include "src/core/lib/event_engine/posix_engine/file_descriptors.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h" @@ -489,7 +490,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted { return local_address_; } - int GetWrappedFd() { return fd_; } + FileDescriptor GetWrappedFd() { return fd_; } bool CanTrackErrors() const { return poller_->CanTrackErrors(); } @@ -532,7 +533,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted { #endif // GRPC_LINUX_ERRQUEUE grpc_core::Mutex read_mu_; PosixSocketWrapper sock_; - int fd_; + FileDescriptor fd_; bool is_first_read_ = true; bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false; double target_length_; @@ -635,7 +636,7 @@ class PosixEndpoint : public PosixEndpointWithFdSupport { return impl_->GetLocalAddress(); } - int GetWrappedFd() override { return impl_->GetWrappedFd(); } + int GetWrappedFd() override { return impl_->GetWrappedFd().fd(); } bool CanTrackErrors() override { return impl_->CanTrackErrors(); } diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 2908bcea14e6e..4a417a7a4dd3f 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -39,6 +39,7 @@ #include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix.h" +#include "src/core/lib/event_engine/posix_engine/file_descriptors.h" #include "src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h" #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" @@ -119,9 +120,7 @@ void AsyncConnect::OnTimeoutExpired(absl::Status status) { void AsyncConnect::OnWritable(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS { - int so_error = 0; socklen_t so_error_size; - int err; int done; int consumed_refs = 1; EventHandle* fd; @@ -184,14 +183,16 @@ void AsyncConnect::OnWritable(absl::Status status) return; } + int so_error = 0; + PosixResult err; do { so_error_size = sizeof(so_error); - err = getsockopt(fd->WrappedFd(), SOL_SOCKET, SO_ERROR, &so_error, - &so_error_size); - } while (err < 0 && errno == EINTR); - if (err < 0) { + err = fd->Poller()->GetFileDescriptors().GetSockOpt( + fd->WrappedFd(), SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + } while (err.IsPosixError(EINTR)); + if (!err.ok()) { status = absl::FailedPreconditionError( - absl::StrCat("getsockopt: ", std::strerror(errno))); + absl::StrCat("getsockopt: ", err.status())); return; } diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 28c76e6d49ecc..ee610f06dbc39 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -248,7 +248,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( // Call on_accept_ and then resume accepting new connections // by continuing the parent for-loop. listener_->on_accept_( - /*listener_fd=*/handle_->WrappedFd(), + /*listener_fd=*/handle_->WrappedFd().fd(), /*endpoint=*/std::move(endpoint), /*is_external=*/false, /*memory_allocator=*/ diff --git a/test/core/event_engine/posix/event_poller_posix_test.cc b/test/core/event_engine/posix/event_poller_posix_test.cc index 87a3fd62ba1c8..801af2c4311ad 100644 --- a/test/core/event_engine/posix/event_poller_posix_test.cc +++ b/test/core/event_engine/posix/event_poller_posix_test.cc @@ -161,7 +161,7 @@ void SessionShutdownCb(session* se, bool /*success*/) { // Called when data become readable in a session. void SessionReadCb(session* se, absl::Status status) { - int fd = se->em_fd->WrappedFd(); + FileDescriptor fd = se->em_fd->WrappedFd(); ssize_t read_once = 0; ssize_t read_total = 0; @@ -172,7 +172,7 @@ void SessionReadCb(session* se, absl::Status status) { } do { - read_once = read(fd, se->read_buf, BUF_SIZE); + read_once = read(fd.fd(), se->read_buf, BUF_SIZE); if (read_once > 0) read_total += read_once; } while (read_once > 0); se->sv->read_bytes_total += read_total; @@ -307,7 +307,7 @@ void ClientSessionShutdownCb(client* cl) { // Write as much as possible, then register notify_on_write. void ClientSessionWrite(client* cl, absl::Status status) { - int fd = cl->em_fd->WrappedFd(); + int fd = cl->em_fd->WrappedFd().fd(); ssize_t write_once = 0; if (!status.ok()) {