Skip to content

Commit

Permalink
ydb_topic: do not call lock_shared recursively (24-1) (#4387)
Browse files Browse the repository at this point in the history
Co-authored-by: ildar-khisambeev <ikhis@ydb.tech>
  • Loading branch information
qyryq and ildar-khisambeev authored May 8, 2024
1 parent c1f7310 commit 9c47d51
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ ydb/library/yql/providers/generic/connector/tests sole*
ydb/library/yql/providers/generic/connector/tests test.py.*
ydb/library/yql/sql/pg/ut PgSqlParsingAutoparam.AutoParamValues_DifferentTypes
ydb/library/yql/tests/sql/dq_file/part* *
ydb/public/sdk/cpp/client/ydb_federated_topic/ut BasicUsage.SimpleHandlers
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut RetryPolicy.RetryWithBatching
ydb/public/sdk/cpp/client/ydb_topic/ut BasicUsage.WriteRead
ydb/services/datastreams/ut DataStreams.TestPutRecordsWithRead
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/persqueue/ut/mirrorer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {

}

srcReader->Close(TDuration::Zero());
dstReader->Close(TDuration::Zero());

// write to source topic
TVector<ui32> messagesPerPartition(partitionsCount, 0);
for (ui32 partition = 0; partition < partitionsCount; ++partition) {
Expand All @@ -163,7 +166,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
{"some_extra_field2", "another_value" + ToString(partition)},
{"file", "/home/user/log" + ToString(partition)}
};
auto writer = CreateSimpleWriter(*driver, srcTopic, sourceId, partition + 1, std::nullopt, std::nullopt, sessionMeta);
auto writer = CreateSimpleWriter(*driver, srcTopic, sourceId, partition + 1, std::nullopt, std::nullopt, sessionMeta);

ui64 seqNo = writer->GetInitSeqNo();

Expand Down Expand Up @@ -211,10 +214,10 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
auto dstReader = createReader(dstTopic, partition);

for (ui32 i = 0; i < messagesPerPartition[partition]; ++i) {
auto dstEvent = GetNextMessageSkipAssignment(dstReader);
auto dstEvent = GetNextMessageSkipAssignment(dstReader, TDuration::Seconds(1));
UNIT_ASSERT(dstEvent);
Cerr << "Destination read message: " << dstEvent->DebugString() << "\n";
auto srcEvent = GetNextMessageSkipAssignment(srcReader);
auto srcEvent = GetNextMessageSkipAssignment(srcReader, TDuration::Seconds(1));
UNIT_ASSERT(srcEvent);
Cerr << "Source read message: " << srcEvent->DebugString() << "\n";

Expand Down Expand Up @@ -263,7 +266,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
server.AnnoyingClient->CreateTopic(topicFullName, 1);

auto driver = server.AnnoyingClient->GetDriver();
auto writer = CreateSimpleWriter(*driver, topic, "src-id-test");
auto writer = CreateSimpleWriter(*driver, topic, "src-id-test");
for (auto i = 0u; i < 5; i++) {
auto res = writer->Write(TString(10, 'a'));
UNIT_ASSERT(res);
Expand Down Expand Up @@ -299,7 +302,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
break;
}
}

for (auto i = 0u; i < 5; i++) {
auto res = writer->Write(TString(10, 'b'));
UNIT_ASSERT(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings&
template <typename TEvent, typename TFederatedEvent>
typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db, std::shared_ptr<TEventFederator> federator) {
if (outerHandler) {
return [outerHandler, db = std::move(db), &federator](TEvent& ev) {
return [outerHandler, db = std::move(db), federator = std::move(federator)](TEvent& ev) {
auto fev = federator->LocateFederate(ev, std::move(db));
return outerHandler(fev);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
#include <util/system/guard.h>
#include <util/system/spinlock.h>

#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include <thread>

namespace NYdb::NPersQueue {

Expand All @@ -17,18 +19,62 @@ template <typename TGuardedObject>
class TCallbackContext {
friend class TContextOwner<TGuardedObject>;

// thread_id -> number of LockShared calls from this thread
using TSharedLockCounter = std::map<std::thread::id, size_t>;
using TSharedLockCounterPtr = std::shared_ptr<TSharedLockCounter>;
using TSpinLockPtr = std::shared_ptr<TSpinLock>;

public:
using TMutexPtr = std::shared_ptr<std::shared_mutex>;

class TBorrowed {
public:
explicit TBorrowed(const TCallbackContext& parent) : Mutex(parent.Mutex) {
Mutex->lock_shared();
explicit TBorrowed(const TCallbackContext& parent)
: Mutex(parent.Mutex)
, SharedLockCounterMutex(parent.SharedLockCounterMutex)
, SharedLockCounter(parent.SharedLockCounter)
{
// "Recursive shared lock".
//
// https://en.cppreference.com/w/cpp/thread/shared_mutex/lock_shared says:
// If lock_shared is called by a thread that already owns the mutex
// in any mode (exclusive or shared), the behavior is UNDEFINED.
//
// So if a thread calls LockShared more than once without releasing the lock,
// we should call lock_shared only on the first call.

bool takeLock = false;

with_lock(*SharedLockCounterMutex) {
auto& counter = SharedLockCounter->emplace(std::this_thread::get_id(), 0).first->second;
++counter;
takeLock = counter == 1;
}

if (takeLock) {
Mutex->lock_shared();
}

Ptr = parent.GuardedObjectPtr.get();
}

~TBorrowed() {
Mutex->unlock_shared();
bool releaseLock = false;

with_lock(*SharedLockCounterMutex) {
auto it = SharedLockCounter->find(std::this_thread::get_id());
Y_ABORT_UNLESS(it != SharedLockCounter->end());
auto& counter = it->second;
--counter;
if (counter == 0) {
releaseLock = true;
SharedLockCounter->erase(it);
}
}

if (releaseLock) {
Mutex->unlock_shared();
}
}

TGuardedObject* operator->() {
Expand All @@ -46,12 +92,17 @@ class TCallbackContext {
private:
TMutexPtr Mutex;
TGuardedObject* Ptr = nullptr;

TSpinLockPtr SharedLockCounterMutex;
TSharedLockCounterPtr SharedLockCounter;
};

public:
explicit TCallbackContext(std::shared_ptr<TGuardedObject> ptr)
: Mutex(std::make_shared<std::shared_mutex>())
, GuardedObjectPtr(std::move(ptr))
, SharedLockCounterMutex(std::make_shared<TSpinLock>())
, SharedLockCounter(std::make_shared<TSharedLockCounter>())
{}

TBorrowed LockShared() {
Expand All @@ -75,8 +126,12 @@ class TCallbackContext {
}

private:

TMutexPtr Mutex;
std::shared_ptr<TGuardedObject> GuardedObjectPtr;

TSpinLockPtr SharedLockCounterMutex;
TSharedLockCounterPtr SharedLockCounter;
};

template<typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,4 @@ namespace NKikimr::NPersQueueTests {
}
return {};
}


}

0 comments on commit 9c47d51

Please sign in to comment.