diff --git a/folly/coro/BUCK b/folly/coro/BUCK index ec39159ea50..6c4ec71e3bd 100644 --- a/folly/coro/BUCK +++ b/folly/coro/BUCK @@ -510,6 +510,18 @@ cpp_library( ], ) +cpp_library( + name = "synchronized", + headers = ["Synchronized.h"], + exported_deps = [ + ":shared_lock", + ":shared_mutex", + ":task", + ":traits", + "//folly:utility", + ], +) + cpp_library( name = "task", headers = ["Task.h"], diff --git a/folly/coro/Synchronized.h b/folly/coro/Synchronized.h new file mode 100644 index 00000000000..010a8bd545e --- /dev/null +++ b/folly/coro/Synchronized.h @@ -0,0 +1,277 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace folly::coro { + +namespace detail { + +template +struct SynchronizedMutexTraits; + +template <> +struct SynchronizedMutexTraits { + using CoroMutex = SharedMutexFair; + using ReadLock = SharedLock; + using WriteLock = std::unique_lock; + + static inline auto co_readLock(CoroMutex& mutex) { + return mutex.co_scoped_lock_shared(); + } + + static inline ReadLock tryReadLock(CoroMutex& mutex) noexcept( + noexcept(ReadLock(mutex, std::try_to_lock))) { + return ReadLock(mutex, std::try_to_lock); + } + + static inline auto co_writeLock(CoroMutex& mutex) { + return mutex.co_scoped_lock(); + } + + static inline auto tryWriteLock(CoroMutex& mutex) noexcept( + noexcept(WriteLock(mutex, std::try_to_lock))) { + return WriteLock(mutex, std::try_to_lock); + } + + static inline void unlock(ReadLock& lock) noexcept(noexcept(lock.unlock())) { + lock.unlock(); + } + + static inline void unlock(WriteLock& lock) noexcept(noexcept(lock.unlock())) { + lock.unlock(); + } + + static inline auto ownsLock(const ReadLock& lock) noexcept( + noexcept(lock.owns_lock())) { + return lock.owns_lock(); + } + + static inline auto ownsLock(const WriteLock& lock) noexcept( + noexcept(lock.owns_lock())) { + return lock.owns_lock(); + } +}; + +} // namespace detail + +/** + * This class is an adaptation of the folly::Synchronized class but is designed + * to work with coro-compatible mutexes like coro::SharedMutexFair instead. + * + * In practice what this means is + * that we can co_await gaining the read/write lock rather than blocking whilst + * acquiring it. + * + * The API is not a complete clone of everything that folly::Synchronized + * supports but is instead the minimum of what we need. Ultimately this classes + * main job is to abstract away gaining the locks. + */ +template < + typename Inner, + typename CoroMutexType = SharedMutexFair, + typename CoroMutexTraits = detail::SynchronizedMutexTraits> +class Synchronized : public NonCopyableNonMovable { + public: + using Traits = CoroMutexTraits; + using CoroMutex = typename Traits::CoroMutex; + using ReadLock = typename Traits::ReadLock; + using WriteLock = typename Traits::WriteLock; + + Synchronized() noexcept(noexcept(CoroMutex{}) && noexcept(Inner{})) = default; + + explicit Synchronized(const Inner& rhs) noexcept(noexcept(Inner(rhs))) + : inner_(rhs) {} + + explicit Synchronized(Inner&& rhs) noexcept(noexcept(Inner(std::move(rhs)))) + : inner_(std::move(rhs)) {} + + template + explicit Synchronized(std::in_place_t, Args&&... args) + : inner_(std::forward(args)...) {} + + /** + * A RAII wrapper around a pointer to the underlying object together with + * a lock on the underlying mutex. + * + * If acquired with a try-lock style method, you must check the boolean + * value of the locked pointer before dereferencing it. + */ + template + class GenericLockedPtr : public MoveOnly { + public: + GenericLockedPtr(GenericLockedPtr&& other) noexcept( + noexcept(LockType(std::move(other.lock_)))) + : lock_(std::move(other.lock_)), + ptr_(std::exchange(other.ptr_, nullptr)) {} + + GenericLockedPtr& operator=(GenericLockedPtr&& other) noexcept( + noexcept(lock_ = std::move(other.lock_))) { + if (this != &other) { + lock_ = std::move(other.lock_); + ptr_ = std::exchange(other.ptr_, nullptr); + } + return *this; + } + + ValueType* operator->() const noexcept { + DCHECK_NE(ptr_, nullptr); + return ptr_; + } + + ValueType& operator*() const noexcept { + DCHECK_NE(ptr_, nullptr); + return *ptr_; + } + + void unlock() { + DCHECK_NE(ptr_, nullptr); + ptr_ = nullptr; + Traits::unlock(lock_); + } + + explicit operator bool() const noexcept { return Traits::ownsLock(lock_); } + + private: + friend class Synchronized; + explicit GenericLockedPtr(LockType&& lock, ValueType* ptr) + : lock_(std::move(lock)), ptr_(ptr) {} + + LockType lock_; + ValueType* ptr_ = nullptr; + }; + + using ReadLockedPtr = GenericLockedPtr; + using WriteLockedPtr = GenericLockedPtr; + + Task wLock() { + auto lock = co_await Traits::co_writeLock(mutex_); + co_return WriteLockedPtr{std::move(lock), &inner_}; + } + + Task rLock() const { + auto lock = co_await Traits::co_readLock(mutex_); + co_return ReadLockedPtr{std::move(lock), &inner_}; + } + + ReadLockedPtr tryRLock() const { + auto lock = Traits::tryReadLock(mutex_); + auto* ptr = Traits::ownsLock(lock) ? &inner_ : nullptr; + return ReadLockedPtr{std::move(lock), ptr}; + } + + WriteLockedPtr tryWLock() { + auto lock = WriteLock{mutex_, std::try_to_lock}; + auto* ptr = Traits::ownsLock(lock) ? &inner_ : nullptr; + return WriteLockedPtr{std::move(lock), ptr}; + } + + template + using rlock_result_t = std::invoke_result_t; + + template + using wlock_result_t = std::invoke_result_t; + + template > + typename std::enable_if, Task>::type + withRLock(FuncT func) const { + auto lock = co_await Traits::co_readLock(mutex_); + co_return func(ReadLockedPtr{std::move(lock), &inner_}); + } + + template > + typename std::enable_if< + is_semi_awaitable_v, + Task>>::type + withRLock(FuncT func) const { + auto lock = co_await Traits::co_readLock(mutex_); + co_return co_await func(ReadLockedPtr{std::move(lock), &inner_}); + } + + template > + typename std::enable_if, Task>::type + withWLock(FuncT func) { + auto lock = co_await Traits::co_writeLock(mutex_); + co_return func(WriteLockedPtr{std::move(lock), &inner_}); + } + + template > + typename std::enable_if< + is_semi_awaitable_v, + Task>>::type + withWLock(FuncT func) { + auto lock = co_await Traits::co_writeLock(mutex_); + co_return co_await func(WriteLockedPtr{std::move(lock), &inner_}); + } + + /** + * Temporarlily locks both objects and swaps their underlying data. + * + * Mimics the behaviour of folly::Synchronized in that we return early if you + * try to swap with itself and gains locks in ascending memory order to + * prevent deadlocks. + */ + Task swap(Synchronized& rhs) { + if (this == &rhs) { + co_return; + } + + // Can't compare pointers for inequality with operator> because it's + // unspecified behavior unless they share provenance, see: + // - https://en.wikipedia.org/wiki/Unspecified_behavior, + // - https://en.cppreference.com/w/cpp/language/operator_comparison. + if (std::greater<>()(this, &rhs)) { + co_return co_await rhs.swap(*this); + } + + auto guard1 = co_await wLock(); + auto guard2 = co_await rhs.wLock(); + + using std::swap; + swap(inner_, rhs.inner_); + + co_return; + } + + Task copy() const { + auto lock = co_await Traits::co_readLock(mutex_); + Inner res = folly::copy(inner_); + co_return res; + } + + Task swap(Inner& newInner) { + auto lock = co_await Traits::co_writeLock(mutex_); + + using std::swap; + swap(inner_, newInner); + } + + private: + mutable CoroMutex mutex_; + Inner inner_; +}; + +} // namespace folly::coro diff --git a/folly/coro/test/BUCK b/folly/coro/test/BUCK index 707219949d8..374d83e8173 100644 --- a/folly/coro/test/BUCK +++ b/folly/coro/test/BUCK @@ -334,6 +334,21 @@ cpp_unittest( ], ) +cpp_unittest( + name = "synchronized_test", + srcs = ["SynchronizedTest.cpp"], + deps = [ + "//folly/coro:baton", + "//folly/coro:blocking_wait", + "//folly/coro:gtest_helpers", + "//folly/coro:synchronized", + "//folly/coro:task", + "//folly/executors:cpu_thread_pool_executor", + "//folly/executors:manual_executor", + "//folly/portability:gtest", + ], +) + cpp_benchmark( name = "task_bench", srcs = ["TaskBenchmark.cpp"], diff --git a/folly/coro/test/SynchronizedTest.cpp b/folly/coro/test/SynchronizedTest.cpp new file mode 100644 index 00000000000..aa853e85a04 --- /dev/null +++ b/folly/coro/test/SynchronizedTest.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include + +using folly::coro::Baton; +using folly::coro::Synchronized; +using folly::coro::Task; + +namespace { + +// Silences clang-tidy's warning about use-after-move +template +T& assumeInitialized(T& value) { + return value; +} + +} // namespace + +class SynchronizedTest : public testing::Test {}; + +TEST_F(SynchronizedTest, MoveLockingPtr) { + Synchronized counter{5}; + + auto readPtr = counter.tryRLock(); + ASSERT_TRUE(readPtr); + EXPECT_EQ(*readPtr, 5); + + auto readPtr2 = std::move(readPtr); + assumeInitialized(readPtr); + EXPECT_FALSE(readPtr); + EXPECT_DEATH({ (void)*readPtr; }, ""); + ASSERT_TRUE(readPtr2); + EXPECT_EQ(*readPtr2, 5); + + auto readPtr3 = counter.tryRLock(); + EXPECT_TRUE(readPtr3); + readPtr3 = std::move(readPtr2); + assumeInitialized(readPtr2); + EXPECT_FALSE(readPtr2); + EXPECT_DEATH({ (void)*readPtr2; }, ""); + ASSERT_TRUE(readPtr3); + EXPECT_EQ(*readPtr3, 5); +} + +TEST_F(SynchronizedTest, ConcurrentReads) { + Synchronized counter; + + folly::ManualExecutor executor; + Baton b; + + static auto kNumReaders = 10; + int64_t activeReaders = 0; + for (auto i = 0; i < kNumReaders; i++) { + auto readTask = counter.withRLock( + [&b, &activeReaders](auto /* unused */) mutable -> Task { + activeReaders++; + co_await b; + activeReaders--; + co_return folly::unit; + }); + std::move(readTask).scheduleOn(&executor).start(); + } + executor.drain(); + EXPECT_EQ(activeReaders, kNumReaders); + + bool writeComplete = false; + auto writeTask = counter.withWLock( + [&writeComplete, + &activeReaders](auto /* unused */) mutable -> Task { + EXPECT_EQ(activeReaders, 0); + writeComplete = true; + co_return folly::unit; + }); + std::move(writeTask).scheduleOn(&executor).start(); + + // Readers have the lock. Writing doesn't run. + executor.drain(); + EXPECT_EQ(activeReaders, kNumReaders); + + // Post the baton. Writing will succeed. + b.post(); + executor.drain(); + EXPECT_TRUE(writeComplete); +} + +TEST_F(SynchronizedTest, ConcurrentReadsWithUnlock) { + Synchronized counter; + + folly::ManualExecutor executor; + Baton b1, b2; + + static auto kNumReaders = 10; + int64_t activeReaders = 0; + for (auto i = 0; i < kNumReaders; i++) { + auto readTask = counter.withRLock( + [&b1, &b2, &activeReaders]( + auto counterReadPtr) mutable -> folly::coro::Task { + activeReaders++; + co_await b1; + + counterReadPtr.unlock(); + co_await b2; + + activeReaders--; + co_return folly::unit; + }); + std::move(readTask).scheduleOn(&executor).start(); + } + executor.drain(); + EXPECT_EQ(activeReaders, kNumReaders); + + bool writeComplete = false; + auto writeTask = counter.withWLock( + [&writeComplete]( + auto /* unused */) mutable -> folly::coro::Task { + writeComplete = true; + co_return folly::unit; + }); + std::move(writeTask).scheduleOn(&executor).start(); + + // Readers have the lock. Writing doesn't run. + executor.drain(); + EXPECT_EQ(activeReaders, kNumReaders); + + // Post the first baton. Lock released, writing will succeed, + // activeReaders won't change. + b1.post(); + executor.drain(); + EXPECT_TRUE(writeComplete); + EXPECT_EQ(activeReaders, kNumReaders); + + // Post the second baton. Lock already released, activeReaders will change. + b2.post(); + executor.drain(); + EXPECT_EQ(activeReaders, 0); +} + +TEST_F(SynchronizedTest, ThreadSafety) { + folly::CPUThreadPoolExecutor threadPool{ + 2, std::make_shared("CPUThreadPool")}; + + static auto kBumpCount = 10000; + Synchronized counter = {}; + + auto makeTask = [&]() -> Task { + for (int i = 0; i < kBumpCount; ++i) { + co_await counter.withWLock([](auto cnt) -> void { (*cnt)++; }); + } + }; + + auto f1 = makeTask().scheduleOn(&threadPool).start(); + auto f2 = makeTask().scheduleOn(&threadPool).start(); + auto f3 = makeTask().scheduleOn(&threadPool).start(); + + std::move(f1).get(); + std::move(f2).get(); + std::move(f3).get(); + + auto finalValue = folly::coro::blockingWait(counter.copy()); + EXPECT_EQ(3 * kBumpCount, finalValue); +} + +CO_TEST_F(SynchronizedTest, SwapAndCopy) { + Synchronized> protectedList({1}); + std::vector newValues = {2}; + + co_await protectedList.swap(newValues); + auto updatedValues = co_await protectedList.copy(); + + EXPECT_EQ(newValues[0], 1); + EXPECT_EQ(updatedValues[0], 2); +} + +TEST_F(SynchronizedTest, TryLock) { + Synchronized sync(1); + + { + const Synchronized& constSynch = sync; + auto lock = constSynch.tryRLock(); + EXPECT_TRUE(lock); + } + + { + auto lock = sync.tryRLock(); + EXPECT_TRUE(lock); + + // Multiple read locks can be acquired + EXPECT_TRUE(sync.tryRLock()); + + // Exclusive lock cannot be acquired + auto writeLock = sync.tryWLock(); + EXPECT_FALSE(writeLock); + EXPECT_DEATH({ (void)*writeLock; }, ""); + + lock.unlock(); + + // After unlock another lock can be acquired + EXPECT_TRUE(sync.tryRLock()); + } + + { + auto lock = sync.tryWLock(); + EXPECT_TRUE(lock); + + // Another exclusive lock cannot be acquired + auto writeLock = sync.tryWLock(); + EXPECT_FALSE(writeLock); + EXPECT_DEATH({ (void)*writeLock; }, ""); + + // A read lock cannot be acquired + auto readLock = sync.tryRLock(); + EXPECT_FALSE(readLock); + EXPECT_DEATH({ (void)*readLock; }, ""); + + lock.unlock(); + + // After unlock another lock can be acquired + EXPECT_TRUE(sync.tryRLock()); + } +}