Skip to content

Commit

Permalink
[-] broken tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Feb 10, 2025
1 parent 163d5a9 commit 567dc75
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 91 deletions.
4 changes: 2 additions & 2 deletions ydb/core/persqueue/cache_eviction.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ namespace NPQ {
, Source(SourcePrefetch)
{}

const TCacheValue::TPtr GetBlob() const { return Blob.lock(); }
const TCacheValue::TPtr GetBlob() const { return Blob; }

private:
TCacheValue::TWeakPtr Blob;
TCacheValue::TPtr Blob;
};

using TMapType = TMap<TBlobId, TValueL1>;
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/persqueue/pq_l2_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx)
{
auto response = MakeHolder<TEvPqCache::TEvCacheKeysResponse>();
for (auto i = Cache.Begin(); i != Cache.End(); ++i) {
const auto& key = i.Key();
response->Keys.emplace_back(key.TabletId, key.Partition, key.Offset, key.PartNo);
}
response->RenamedKeys = RenamedKeys;
ctx.Send(ev->Sender, response.Release());
}

Expand Down Expand Up @@ -170,6 +167,8 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con
void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId,
const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs)
{
RenamedKeys += blobs.size();

for (const auto& [oldBlob, newBlob] : blobs) {
TKey oldKey(tabletId, oldBlob);
auto it = Cache.FindWithoutPromote(oldKey);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_l2_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class TPersQueueCacheL2 : public TActorBootstrapped<TPersQueueCacheL2> {
TL2Counters Counters;

TString HttpForm() const;

size_t RenamedKeys = 0;
};

} // NPQ
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/persqueue/pq_l2_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,7 @@ struct TEvPqCache {
};

struct TEvCacheKeysResponse : TEventLocal<TEvCacheKeysResponse, EvCacheKeysResponse> {
struct TKey {
ui64 TabletId;
TPartitionId Partition;
ui64 Offset;
ui16 PartNo;
};

TVector<TKey> Keys;
size_t RenamedKeys = 0;
};
};

Expand Down
94 changes: 17 additions & 77 deletions ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ class TFixture : public NUnitTest::TBaseFixture {
void CheckTabletKeys(const TString& topicName);
void DumpPQTabletKeys(const TString& topicName);

void EnsureKeysIsEqual(const TString& topicName, unsigned id);

NTable::TDataQueryResult ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control);

TVector<TString> Read_Exactly_N_Messages_From_Topic(const TString& topicPath,
Expand Down Expand Up @@ -240,6 +238,8 @@ class TFixture : public NUnitTest::TBaseFixture {
virtual bool GetEnableHtapTx() const;
virtual bool GetAllowOlapDataQuery() const;

size_t GetPQCacheRenameKeysCount();

private:
template<class E>
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
Expand All @@ -253,8 +253,6 @@ class TFixture : public NUnitTest::TBaseFixture {
ui64 tabletId);
std::vector<std::string> GetPQTabletDataKeys(const TActorId& actorId,
ui64 tabletId);
std::vector<NKikimr::NPQ::TEvPqCache::TEvCacheKeysResponse::TKey> GetPQCacheKeys(const TActorId& actorId,
ui64 tabletId);
NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId,
ui64 tabletId);
void SendLongTxLockStatus(const TActorId& actorId,
Expand Down Expand Up @@ -1111,12 +1109,12 @@ std::vector<std::string> TFixture::GetPQTabletDataKeys(const TActorId& actorId,
return keys;
}

std::vector<NKikimr::NPQ::TEvPqCache::TEvCacheKeysResponse::TKey> TFixture::GetPQCacheKeys(const TActorId& edge,
ui64 tabletId)
size_t TFixture::GetPQCacheRenameKeysCount()
{
using namespace NKikimr::NPQ;

auto& runtime = Setup->GetRuntime();
TActorId edge = runtime.AllocateEdgeActor();

auto request = MakeHolder<TEvPqCache::TEvCacheKeysRequest>();

Expand All @@ -1125,15 +1123,7 @@ std::vector<NKikimr::NPQ::TEvPqCache::TEvCacheKeysResponse::TKey> TFixture::GetP
TAutoPtr<IEventHandle> handle;
auto* result = runtime.GrabEdgeEvent<TEvPqCache::TEvCacheKeysResponse>(handle);

std::vector<NKikimr::NPQ::TEvPqCache::TEvCacheKeysResponse::TKey> keys;

for (const auto& key : result->Keys) {
if (key.TabletId == tabletId) {
keys.push_back(key);
}
}

return keys;
return result->RenamedKeys;
}

void TFixture::RestartLongTxService()
Expand Down Expand Up @@ -1842,51 +1832,6 @@ void TFixture::CheckTabletKeys(const TString& topicName)
}
}

void TFixture::EnsureKeysIsEqual(const TString& topicName, unsigned id)
{
using namespace NKikimr::NPQ;

auto& runtime = Setup->GetRuntime();
TActorId edge = runtime.AllocateEdgeActor();
ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, 0);

auto tabletKeys = GetPQTabletDataKeys(edge, tabletId);
auto cacheKeys = GetPQCacheKeys(edge, tabletId);

UNIT_ASSERT_VALUES_EQUAL_C(tabletKeys.size(), cacheKeys.size(), "id=" << id);

auto compareBlobId = [](const TEvPqCache::TEvCacheKeysResponse::TKey& lhs,
const TEvPqCache::TEvCacheKeysResponse::TKey& rhs) {
auto makeTuple = [](const TEvPqCache::TEvCacheKeysResponse::TKey& v) {
return std::make_tuple(v.TabletId,
v.Partition,
v.Offset,
v.PartNo);
};

return makeTuple(lhs) < makeTuple(rhs);
};

std::sort(tabletKeys.begin(), tabletKeys.end());
std::sort(cacheKeys.begin(), cacheKeys.end(), compareBlobId);

auto toString = [](const std::string& s) -> TString {
return {s.begin(), s.end()};
};

for (size_t i = 0; i < tabletKeys.size(); ++i) {
const TKey key(toString(tabletKeys[i]));
const auto& blobId = cacheKeys[i];

UNIT_ASSERT_VALUES_EQUAL_C(key.GetPartition().InternalPartitionId, blobId.Partition.InternalPartitionId,
"id=" << id << ", i=" << i);
UNIT_ASSERT_VALUES_EQUAL_C(key.GetOffset(), blobId.Offset,
"id=" << id << ", i=" << i);
UNIT_ASSERT_VALUES_EQUAL_C(key.GetPartNo(), blobId.PartNo,
"id=" << id << ", i=" << i);
}
}

void TFixture::DumpPQTabletKeys(const TString& topicName)
{
auto& runtime = Setup->GetRuntime();
Expand Down Expand Up @@ -2676,22 +2621,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_49, TFixture)
{
// We write 252 128KB messages to the topic. After each write operation, we check that the partition
// and cache have the same set of keys. The test will include both adding new keys and deleting old ones.
CreateTopic("topic_A", TEST_CONSUMER);

TString message(128_KB, 'x');

for (unsigned i = 0; i < 252; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, message);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);

EnsureKeysIsEqual("topic_A", i);
}
}

Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture)
{
// We write to the topic in the transaction. When a transaction is committed, the keys in the blob
Expand All @@ -2707,14 +2636,23 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture)
auto session = CreateTableSession();

// tx #1
// After the transaction commit, there will be no large blobs in the batches. The number of renames
// will not change in the cache.
auto tx = BeginTx(session);

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);

UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);

CommitTx(tx, EStatus::SUCCESS);

Sleep(TDuration::Seconds(5));

UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);

// tx #2
// After the commit, the party will rename one big blob
tx = BeginTx(session);

for (unsigned i = 0; i < 80; ++i) {
Expand All @@ -2723,11 +2661,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture)

WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);

UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);

CommitTx(tx, EStatus::SUCCESS);

Sleep(TDuration::Seconds(5));

EnsureKeysIsEqual("topic_A", 0);
UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1);
}

class TFixtureSinks : public TFixture {
Expand Down

0 comments on commit 567dc75

Please sign in to comment.