Skip to content

Commit

Permalink
fixup: shutdown problem
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Dec 17, 2024
1 parent 0e49489 commit 1ed9eae
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 23 deletions.
74 changes: 60 additions & 14 deletions src/core/lib/event_engine/posix_engine/file_descriptors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

#include "src/core/lib/event_engine/posix_engine/file_descriptors.h"

#include <unistd.h>

#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
Expand All @@ -26,35 +29,78 @@ namespace grpc_event_engine {
namespace experimental {
namespace {

thread_local std::unordered_map<const FileDescriptors*, int> thread_locks_count;
grpc_core::Mutex gone_threads_mu;
std::unordered_set<int> gone_threads ABSL_GUARDED_BY(&gone_threads_mu);

void FdLocked(const FileDescriptors* descriptors) {
if (descriptors != nullptr) {
++thread_locks_count[descriptors];
class ThreadLocalCounter {
public:
ThreadLocalCounter() {
grpc_core::MutexLock lock(&gone_threads_mu);
gone_threads.erase(gettid());
LOG(INFO) << "Counter created " << this << " tid: " << gettid();
}
}

void FdUnlocked(const FileDescriptors* descriptors) {
if (descriptors != nullptr) {
--thread_locks_count[descriptors];
CHECK_GE(thread_locks_count[descriptors], 0);
~ThreadLocalCounter() {
grpc_core::MutexLock lock(&gone_threads_mu);
gone_threads.emplace(gettid());
LOG(INFO) << "Counter destroyed " << this << " tid: " << gettid();
}
}

void FdLocked(const FileDescriptors* descriptors) {
if (descriptors != nullptr && !CheckAndReportThreadThatIsGone()) {
++thread_locks_count_[descriptors];
}
}

void FdUnlocked(const FileDescriptors* descriptors) {
if (descriptors != nullptr && !CheckAndReportThreadThatIsGone()) {
--thread_locks_count_[descriptors];
CHECK_GE(thread_locks_count_[descriptors], 0);
}
}

int count(const FileDescriptors* descriptors) const {
if (!CheckAndReportThreadThatIsGone()) {
return 0;
}
auto c = thread_locks_count_.find(descriptors);
if (c != thread_locks_count_.end()) {
return c->second;
} else {
return 0;
}
}

private:
static bool CheckAndReportThreadThatIsGone() {
int tid = gettid();
grpc_core::MutexLock lock(&gone_threads_mu);
if (gone_threads.find(tid) != gone_threads.end()) {
LOG(INFO) << "Thread " << tid << " is gone";
return false;
}
return true;
}

std::unordered_map<const FileDescriptors*, int> thread_locks_count_;
};

thread_local ThreadLocalCounter counter;

} // namespace

ReentrantLock::ReentrantLock(const FileDescriptors* descriptors)
: descriptors_(descriptors) {
if (descriptors_ != nullptr) {
descriptors_->IncrementCounter();
FdLocked(descriptors);
counter.FdLocked(descriptors);
}
}

ReentrantLock::~ReentrantLock() noexcept {
if (descriptors_ != nullptr) {
descriptors_->DecrementCounter();
FdUnlocked(descriptors_);
counter.FdUnlocked(descriptors_);
}
}

Expand Down Expand Up @@ -117,11 +163,11 @@ void FileDescriptors::DecrementCounter() const {
}

absl::Status FileDescriptors::Stop() {
if (thread_locks_count[this] > 0) {
if (counter.count(this) > 0) {
return absl::FailedPreconditionError(
absl::StrFormat("Current thread holds %d i/o locks that need to be "
"released before calling fork",
thread_locks_count[this]));
counter.count(this)));
}
grpc_core::MutexLock lock(&mu_);
CHECK(state_ == State::kReady)
Expand Down
10 changes: 4 additions & 6 deletions src/core/lib/event_engine/posix_engine/posix_system_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h>

#include <algorithm>
#include <array>
#include <cerrno>
#include <unordered_set>
#include <utility>

#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/port.h"
Expand Down Expand Up @@ -690,7 +687,7 @@ std::pair<int, std::array<FileDescriptor, 2>> SystemApi::Pipe() {
return {status, {AdoptExternalFd(fds[0]), AdoptExternalFd(fds[1])}};
}

#ifdef GRPC_LINUX_EPOLL
#ifdef GRPC_LINUX_EVENTFD
absl::StatusOr<long> SystemApi::EventFdRead(FileDescriptor fd,
uint64_t* value) const {
return WithFd(fd,
Expand Down Expand Up @@ -883,7 +880,8 @@ std::pair<int, std::array<FileDescriptor, 2>> SystemApi::SocketPair(
int domain, int type, int protocol) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_LINUX_EPOLL

#ifdef GRPC_LINUX_EVENTFD

absl::StatusOr<long> SystemApi::EventFdRead(FileDescriptor fd,
uint64_t* value) const {
Expand All @@ -900,7 +898,7 @@ absl::StatusOr<int> EpollCtl(FileDescriptor epfd, int op, FileDescriptor fd,
grpc_core::Crash("unimplemented");
}

#endif // GRPC_LINUX_EPOLL
#endif // GRPC_LINUX_EVENTFD

} // namespace experimental
} // namespace grpc_event_engine
Expand Down
4 changes: 2 additions & 2 deletions src/core/lib/event_engine/posix_engine/posix_system_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ class SystemApi {
socklen_t addrlen) const;
absl::StatusOr<ReentrantLock> PosixLock() const { return fds_.PosixLock(); }

#ifdef GRPC_LINUX_EPOLL
#ifdef GRPC_LINUX_EVENTFD
absl::StatusOr<long> EventFdRead(FileDescriptor fd, uint64_t* value) const;
absl::StatusOr<int> EventFdWrite(FileDescriptor fd, uint64_t value) const;
absl::StatusOr<int> EpollCtl(FileDescriptor epfd, int op, FileDescriptor fd,
struct epoll_event* event) const;
absl::StatusOr<int> EpollWait(FileDescriptor epfd, struct epoll_event* events,
int maxevents, int timeout) const;
#endif // GRPC_LINUX_EPOLL
#endif // GRPC_LINUX_EVENTFD
absl::StatusOr<int> GetSockOpt(FileDescriptor fd, int level, int optname,
void* optval, socklen_t* optlen) const;
absl::StatusOr<int> GetSockName(FileDescriptor fd, struct sockaddr* addr,
Expand Down
2 changes: 1 addition & 1 deletion test/core/event_engine/posix/posix_system_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ TEST(PosixSystemApiTest, PosixLevel) {
ASSERT_EQ(client_api.Write(server_client->client, buf.data(), buf.size())
.status()
.code(),
absl::StatusCode::kNotFound);
absl::StatusCode::kInternal);
// Send using the new connection
server_client = EstablishConnection(server_api, client_api, *server, port);
ASSERT_TRUE(server_client.ok()) << server_client.status();
Expand Down

0 comments on commit 1ed9eae

Please sign in to comment.