From 567dc75d7baab106a43fa03534ffa8a28c511fcd Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 7 Feb 2025 15:28:10 +0300 Subject: [PATCH] [-] broken tests --- ydb/core/persqueue/cache_eviction.h | 4 +- ydb/core/persqueue/pq_l2_cache.cpp | 7 +- ydb/core/persqueue/pq_l2_cache.h | 2 + ydb/core/persqueue/pq_l2_service.h | 9 +- .../src/client/topic/ut/topic_to_table_ut.cpp | 94 ++++--------------- 5 files changed, 25 insertions(+), 91 deletions(-) diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h index a0306da974ae..6826fb8164ca 100644 --- a/ydb/core/persqueue/cache_eviction.h +++ b/ydb/core/persqueue/cache_eviction.h @@ -198,10 +198,10 @@ namespace NPQ { , Source(SourcePrefetch) {} - const TCacheValue::TPtr GetBlob() const { return Blob.lock(); } + const TCacheValue::TPtr GetBlob() const { return Blob; } private: - TCacheValue::TWeakPtr Blob; + TCacheValue::TPtr Blob; }; using TMapType = TMap; diff --git a/ydb/core/persqueue/pq_l2_cache.cpp b/ydb/core/persqueue/pq_l2_cache.cpp index 0440ecb600bc..d8ca1cb39e20 100644 --- a/ydb/core/persqueue/pq_l2_cache.cpp +++ b/ydb/core/persqueue/pq_l2_cache.cpp @@ -76,10 +76,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap(); - for (auto i = Cache.Begin(); i != Cache.End(); ++i) { - const auto& key = i.Key(); - response->Keys.emplace_back(key.TabletId, key.Partition, key.Offset, key.PartNo); - } + response->RenamedKeys = RenamedKeys; ctx.Send(ev->Sender, response.Release()); } @@ -170,6 +167,8 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId, const TVector>& blobs) { + RenamedKeys += blobs.size(); + for (const auto& [oldBlob, newBlob] : blobs) { TKey oldKey(tabletId, oldBlob); auto it = Cache.FindWithoutPromote(oldKey); diff --git a/ydb/core/persqueue/pq_l2_cache.h b/ydb/core/persqueue/pq_l2_cache.h index 3a43685fb833..3747da4d1e5f 100644 --- a/ydb/core/persqueue/pq_l2_cache.h +++ b/ydb/core/persqueue/pq_l2_cache.h @@ -135,6 +135,8 @@ class TPersQueueCacheL2 : public TActorBootstrapped { TL2Counters Counters; TString HttpForm() const; + + size_t RenamedKeys = 0; }; } // NPQ diff --git a/ydb/core/persqueue/pq_l2_service.h b/ydb/core/persqueue/pq_l2_service.h index 28cffc751722..a02b2d9bfbfd 100644 --- a/ydb/core/persqueue/pq_l2_service.h +++ b/ydb/core/persqueue/pq_l2_service.h @@ -130,14 +130,7 @@ struct TEvPqCache { }; struct TEvCacheKeysResponse : TEventLocal { - struct TKey { - ui64 TabletId; - TPartitionId Partition; - ui64 Offset; - ui16 PartNo; - }; - - TVector Keys; + size_t RenamedKeys = 0; }; }; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp index c626972807b5..a057b113f85e 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp @@ -191,8 +191,6 @@ class TFixture : public NUnitTest::TBaseFixture { void CheckTabletKeys(const TString& topicName); void DumpPQTabletKeys(const TString& topicName); - void EnsureKeysIsEqual(const TString& topicName, unsigned id); - NTable::TDataQueryResult ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control); TVector Read_Exactly_N_Messages_From_Topic(const TString& topicPath, @@ -240,6 +238,8 @@ class TFixture : public NUnitTest::TBaseFixture { virtual bool GetEnableHtapTx() const; virtual bool GetAllowOlapDataQuery() const; + size_t GetPQCacheRenameKeysCount(); + private: template E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); @@ -253,8 +253,6 @@ class TFixture : public NUnitTest::TBaseFixture { ui64 tabletId); std::vector GetPQTabletDataKeys(const TActorId& actorId, ui64 tabletId); - std::vector GetPQCacheKeys(const TActorId& actorId, - ui64 tabletId); NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId, ui64 tabletId); void SendLongTxLockStatus(const TActorId& actorId, @@ -1111,12 +1109,12 @@ std::vector TFixture::GetPQTabletDataKeys(const TActorId& actorId, return keys; } -std::vector TFixture::GetPQCacheKeys(const TActorId& edge, - ui64 tabletId) +size_t TFixture::GetPQCacheRenameKeysCount() { using namespace NKikimr::NPQ; auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); auto request = MakeHolder(); @@ -1125,15 +1123,7 @@ std::vector TFixture::GetP TAutoPtr handle; auto* result = runtime.GrabEdgeEvent(handle); - std::vector keys; - - for (const auto& key : result->Keys) { - if (key.TabletId == tabletId) { - keys.push_back(key); - } - } - - return keys; + return result->RenamedKeys; } void TFixture::RestartLongTxService() @@ -1842,51 +1832,6 @@ void TFixture::CheckTabletKeys(const TString& topicName) } } -void TFixture::EnsureKeysIsEqual(const TString& topicName, unsigned id) -{ - using namespace NKikimr::NPQ; - - auto& runtime = Setup->GetRuntime(); - TActorId edge = runtime.AllocateEdgeActor(); - ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, 0); - - auto tabletKeys = GetPQTabletDataKeys(edge, tabletId); - auto cacheKeys = GetPQCacheKeys(edge, tabletId); - - UNIT_ASSERT_VALUES_EQUAL_C(tabletKeys.size(), cacheKeys.size(), "id=" << id); - - auto compareBlobId = [](const TEvPqCache::TEvCacheKeysResponse::TKey& lhs, - const TEvPqCache::TEvCacheKeysResponse::TKey& rhs) { - auto makeTuple = [](const TEvPqCache::TEvCacheKeysResponse::TKey& v) { - return std::make_tuple(v.TabletId, - v.Partition, - v.Offset, - v.PartNo); - }; - - return makeTuple(lhs) < makeTuple(rhs); - }; - - std::sort(tabletKeys.begin(), tabletKeys.end()); - std::sort(cacheKeys.begin(), cacheKeys.end(), compareBlobId); - - auto toString = [](const std::string& s) -> TString { - return {s.begin(), s.end()}; - }; - - for (size_t i = 0; i < tabletKeys.size(); ++i) { - const TKey key(toString(tabletKeys[i])); - const auto& blobId = cacheKeys[i]; - - UNIT_ASSERT_VALUES_EQUAL_C(key.GetPartition().InternalPartitionId, blobId.Partition.InternalPartitionId, - "id=" << id << ", i=" << i); - UNIT_ASSERT_VALUES_EQUAL_C(key.GetOffset(), blobId.Offset, - "id=" << id << ", i=" << i); - UNIT_ASSERT_VALUES_EQUAL_C(key.GetPartNo(), blobId.PartNo, - "id=" << id << ", i=" << i); - } -} - void TFixture::DumpPQTabletKeys(const TString& topicName) { auto& runtime = Setup->GetRuntime(); @@ -2676,22 +2621,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); } -Y_UNIT_TEST_F(WriteToTopic_Demo_49, TFixture) -{ - // We write 252 128KB messages to the topic. After each write operation, we check that the partition - // and cache have the same set of keys. The test will include both adding new keys and deleting old ones. - CreateTopic("topic_A", TEST_CONSUMER); - - TString message(128_KB, 'x'); - - for (unsigned i = 0; i < 252; ++i) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, message); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - - EnsureKeysIsEqual("topic_A", i); - } -} - Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture) { // We write to the topic in the transaction. When a transaction is committed, the keys in the blob @@ -2707,14 +2636,23 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture) auto session = CreateTableSession(); // tx #1 + // After the transaction commit, there will be no large blobs in the batches. The number of renames + // will not change in the cache. auto tx = BeginTx(session); WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx); WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + CommitTx(tx, EStatus::SUCCESS); + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + // tx #2 + // After the commit, the party will rename one big blob tx = BeginTx(session); for (unsigned i = 0; i < 80; ++i) { @@ -2723,11 +2661,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture) WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + CommitTx(tx, EStatus::SUCCESS); Sleep(TDuration::Seconds(5)); - EnsureKeysIsEqual("topic_A", 0); + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1); } class TFixtureSinks : public TFixture {