Skip to content

Commit

Permalink
fixup: object oriented
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Jan 15, 2025
1 parent 903d53b commit cbce4f7
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
53 changes: 36 additions & 17 deletions src/core/lib/event_engine/posix_engine/file_descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <grpc/event_engine/event_engine.h>

#include <cerrno>

#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
Expand Down Expand Up @@ -53,46 +55,63 @@ void AbslStringify(Sink& sink, OperationResultKind kind) {

// Result of the factory call. kWrongGeneration may happen in the call to
// Accept*
struct FileDescriptorResult {
OperationResultKind kind;
// gRPC wrapped FileDescriptor, as described above
FileDescriptor fd;
// errno value on call completion, in order to reduce the race conditions
// from relying on global variable.
int errno_value;

class FileDescriptorResult {
public:
static FileDescriptorResult FD(const FileDescriptor& fd) {
return {OperationResultKind::kSuccess, fd, 0};
return FileDescriptorResult(OperationResultKind::kSuccess, fd, 0);
}

static FileDescriptorResult Error() {
return {OperationResultKind::kError, {}, errno};
return FileDescriptorResult(OperationResultKind::kError, FileDescriptor(),
errno);
}

int operator*() const {
FileDescriptorResult() = default;

FileDescriptor operator*() const {
CHECK_OK(status());
return fd.fd();
return fd_;
}

bool ok() const {
return kind == OperationResultKind::kSuccess && fd.fd() > 0;
int fd() const {
CHECK_OK(status());
return fd_.fd();
}

absl::Status status() const {
switch (kind) {
switch (kind_) {
case OperationResultKind::kSuccess:
return absl::OkStatus();
case OperationResultKind::kError:
return absl::ErrnoToStatus(errno_value, "");
return absl::ErrnoToStatus(errno_value_, "");
case OperationResultKind::kWrongGeneration:
return absl::InternalError(
"File descriptor is from the wrong generation");
}
}

bool ok() const {
return kind_ == OperationResultKind::kSuccess && fd_.fd() > 0;
}

bool IsPosixError(int err) const {
return kind == OperationResultKind::kError && errno_value == err;
return kind_ == OperationResultKind::kError && errno_value_ == err;
}

OperationResultKind kind() const { return kind_; }
int errno_value() const { return errno_value_; }

private:
FileDescriptorResult(OperationResultKind kind, const FileDescriptor& fd,
int errno_value = 0)
: kind_(kind), fd_(fd), errno_value_(errno_value) {}

OperationResultKind kind_;
// gRPC wrapped FileDescriptor, as described above
FileDescriptor fd_;
// errno value on call completion, in order to reduce the race conditions
// from relying on global variable.
int errno_value_;
};

class FileDescriptors {
Expand Down
15 changes: 8 additions & 7 deletions src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
// strip off the ::ffff:0.0.0.0/96 prefix first.
FileDescriptorResult fd = handle_->Poller()->GetFileDescriptors().Accept4(
handle_->WrappedFd(), addr, 1, 1);
if (fd.kind == OperationResultKind::kWrongGeneration) {
if (fd.kind() == OperationResultKind::kWrongGeneration) {
LOG(ERROR) << "Closing acceptor. accept4 was called with fd from a wrong "
"generation";
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
if (fd.kind == OperationResultKind::kError) {
switch (fd.errno_value) {
if (fd.kind() == OperationResultKind::kError) {
switch (fd.errno_value()) {
case EINTR:
continue;
case EMFILE:
Expand Down Expand Up @@ -196,22 +196,23 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
// sun_path of sockaddr_un, so explicitly call getpeername to get it.
if (addr.address()->sa_family == AF_UNIX) {
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
if (getpeername(*fd, const_cast<sockaddr*>(addr.address()), &len) < 0) {
if (getpeername(fd.fd(), const_cast<sockaddr*>(addr.address()), &len) <
0) {
auto listener_addr_uri = ResolvedAddressToURI(socket_.addr);
LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno)
<< ". Dropping the connection, and continuing "
"to listen on "
<< (listener_addr_uri.ok() ? *listener_addr_uri
: "<unknown>")
<< ":" << socket_.port;
handle_->Poller()->GetFileDescriptors().Close(fd.fd);
handle_->Poller()->GetFileDescriptors().Close(*fd);
handle_->NotifyOnRead(notify_on_accept_);
return;
}
addr = EventEngine::ResolvedAddress(addr.address(), len);
}

PosixSocketWrapper sock(*fd);
PosixSocketWrapper sock(fd.fd());
(void)sock.SetSocketNoSigpipeIfPossible();
auto result = sock.ApplySocketMutatorInOptions(
GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_);
Expand All @@ -235,7 +236,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
}
auto endpoint = CreatePosixEndpoint(
/*handle=*/listener_->poller_->CreateHandle(
fd.fd, *peer_name, listener_->poller_->CanTrackErrors()),
*fd, *peer_name, listener_->poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/listener_->engine_,
// allocator=
listener_->memory_allocator_factory_->CreateMemoryAllocator(
Expand Down
8 changes: 4 additions & 4 deletions test/core/event_engine/posix/event_poller_posix_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ void ListenCb(server* sv, absl::Status status) {
<< fd.status();
}
ASSERT_TRUE(fd.ok()) << fd.status();
EXPECT_LT(*fd, FD_SETSIZE);
flags = fcntl(*fd, F_GETFL, 0);
fcntl(*fd, F_SETFL, flags | O_NONBLOCK);
EXPECT_LT(fd.fd(), FD_SETSIZE);
flags = fcntl(fd.fd(), F_GETFL, 0);
fcntl(fd.fd(), F_SETFL, flags | O_NONBLOCK);
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = g_event_poller->CreateHandle(fd.fd, "listener", false);
se->em_fd = g_event_poller->CreateHandle(*fd, "listener", false);
se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
Expand Down

0 comments on commit cbce4f7

Please sign in to comment.