diff --git a/src/core/lib/event_engine/posix_engine/file_descriptors.cc b/src/core/lib/event_engine/posix_engine/file_descriptors.cc index bfa81b1c27cfd..2919c8019bf5f 100644 --- a/src/core/lib/event_engine/posix_engine/file_descriptors.cc +++ b/src/core/lib/event_engine/posix_engine/file_descriptors.cc @@ -14,10 +14,13 @@ #include "src/core/lib/event_engine/posix_engine/file_descriptors.h" +#include + #include #include #include +#include "absl/log/log.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -26,20 +29,63 @@ namespace grpc_event_engine { namespace experimental { namespace { -thread_local std::unordered_map thread_locks_count; +grpc_core::Mutex gone_threads_mu; +std::unordered_set gone_threads ABSL_GUARDED_BY(&gone_threads_mu); -void FdLocked(const FileDescriptors* descriptors) { - if (descriptors != nullptr) { - ++thread_locks_count[descriptors]; +class ThreadLocalCounter { + public: + ThreadLocalCounter() { + grpc_core::MutexLock lock(&gone_threads_mu); + gone_threads.erase(gettid()); + LOG(INFO) << "Counter created " << this << " tid: " << gettid(); } -} -void FdUnlocked(const FileDescriptors* descriptors) { - if (descriptors != nullptr) { - --thread_locks_count[descriptors]; - CHECK_GE(thread_locks_count[descriptors], 0); + ~ThreadLocalCounter() { + grpc_core::MutexLock lock(&gone_threads_mu); + gone_threads.emplace(gettid()); + LOG(INFO) << "Counter destroyed " << this << " tid: " << gettid(); } -} + + void FdLocked(const FileDescriptors* descriptors) { + if (descriptors != nullptr && !CheckAndReportThreadThatIsGone()) { + ++thread_locks_count_[descriptors]; + } + } + + void FdUnlocked(const FileDescriptors* descriptors) { + if (descriptors != nullptr && !CheckAndReportThreadThatIsGone()) { + --thread_locks_count_[descriptors]; + CHECK_GE(thread_locks_count_[descriptors], 0); + } + } + + int count(const FileDescriptors* descriptors) const { + if (!CheckAndReportThreadThatIsGone()) { + return 0; + } + auto c = thread_locks_count_.find(descriptors); + if (c != thread_locks_count_.end()) { + return c->second; + } else { + return 0; + } + } + + private: + static bool CheckAndReportThreadThatIsGone() { + int tid = gettid(); + grpc_core::MutexLock lock(&gone_threads_mu); + if (gone_threads.find(tid) != gone_threads.end()) { + LOG(INFO) << "Thread " << tid << " is gone"; + return false; + } + return true; + } + + std::unordered_map thread_locks_count_; +}; + +thread_local ThreadLocalCounter counter; } // namespace @@ -47,14 +93,14 @@ ReentrantLock::ReentrantLock(const FileDescriptors* descriptors) : descriptors_(descriptors) { if (descriptors_ != nullptr) { descriptors_->IncrementCounter(); - FdLocked(descriptors); + counter.FdLocked(descriptors); } } ReentrantLock::~ReentrantLock() noexcept { if (descriptors_ != nullptr) { descriptors_->DecrementCounter(); - FdUnlocked(descriptors_); + counter.FdUnlocked(descriptors_); } } @@ -117,11 +163,11 @@ void FileDescriptors::DecrementCounter() const { } absl::Status FileDescriptors::Stop() { - if (thread_locks_count[this] > 0) { + if (counter.count(this) > 0) { return absl::FailedPreconditionError( absl::StrFormat("Current thread holds %d i/o locks that need to be " "released before calling fork", - thread_locks_count[this])); + counter.count(this))); } grpc_core::MutexLock lock(&mu_); CHECK(state_ == State::kReady) diff --git a/src/core/lib/event_engine/posix_engine/posix_system_api.cc b/src/core/lib/event_engine/posix_engine/posix_system_api.cc index 798706beeb5f3..2e2cae58b7ca6 100644 --- a/src/core/lib/event_engine/posix_engine/posix_system_api.cc +++ b/src/core/lib/event_engine/posix_engine/posix_system_api.cc @@ -17,17 +17,14 @@ #include #include -#include #include #include -#include #include #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/status/status.h" #include "absl/strings/str_cat.h" -#include "absl/synchronization/mutex.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/iomgr/port.h" @@ -690,7 +687,7 @@ std::pair> SystemApi::Pipe() { return {status, {AdoptExternalFd(fds[0]), AdoptExternalFd(fds[1])}}; } -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EVENTFD absl::StatusOr SystemApi::EventFdRead(FileDescriptor fd, uint64_t* value) const { return WithFd(fd, @@ -883,7 +880,8 @@ std::pair> SystemApi::SocketPair( int domain, int type, int protocol) { grpc_core::Crash("unimplemented"); } -#ifdef GRPC_LINUX_EPOLL + +#ifdef GRPC_LINUX_EVENTFD absl::StatusOr SystemApi::EventFdRead(FileDescriptor fd, uint64_t* value) const { @@ -900,7 +898,7 @@ absl::StatusOr EpollCtl(FileDescriptor epfd, int op, FileDescriptor fd, grpc_core::Crash("unimplemented"); } -#endif // GRPC_LINUX_EPOLL +#endif // GRPC_LINUX_EVENTFD } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/posix_system_api.h b/src/core/lib/event_engine/posix_engine/posix_system_api.h index ceeba9dc2fdb4..1582e5cfb0c80 100644 --- a/src/core/lib/event_engine/posix_engine/posix_system_api.h +++ b/src/core/lib/event_engine/posix_engine/posix_system_api.h @@ -119,14 +119,14 @@ class SystemApi { socklen_t addrlen) const; absl::StatusOr PosixLock() const { return fds_.PosixLock(); } -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EVENTFD absl::StatusOr EventFdRead(FileDescriptor fd, uint64_t* value) const; absl::StatusOr EventFdWrite(FileDescriptor fd, uint64_t value) const; absl::StatusOr EpollCtl(FileDescriptor epfd, int op, FileDescriptor fd, struct epoll_event* event) const; absl::StatusOr EpollWait(FileDescriptor epfd, struct epoll_event* events, int maxevents, int timeout) const; -#endif // GRPC_LINUX_EPOLL +#endif // GRPC_LINUX_EVENTFD absl::StatusOr GetSockOpt(FileDescriptor fd, int level, int optname, void* optval, socklen_t* optlen) const; absl::StatusOr GetSockName(FileDescriptor fd, struct sockaddr* addr, diff --git a/test/core/event_engine/posix/posix_system_api_test.cc b/test/core/event_engine/posix/posix_system_api_test.cc index fa7e603d5ea68..4979cfdd9ebb0 100644 --- a/test/core/event_engine/posix/posix_system_api_test.cc +++ b/test/core/event_engine/posix/posix_system_api_test.cc @@ -190,7 +190,7 @@ TEST(PosixSystemApiTest, PosixLevel) { ASSERT_EQ(client_api.Write(server_client->client, buf.data(), buf.size()) .status() .code(), - absl::StatusCode::kNotFound); + absl::StatusCode::kInternal); // Send using the new connection server_client = EstablishConnection(server_api, client_api, *server, port); ASSERT_TRUE(server_client.ok()) << server_client.status();