From e69d5d312d7d3e3bb69a037e87c718946366dfe8 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 14 Feb 2025 15:45:15 +0300 Subject: [PATCH] [+] comments --- ydb/core/persqueue/partition_init.cpp | 25 +++++++++++++------ ydb/core/persqueue/partition_init.h | 1 + ydb/core/persqueue/ut/common/pq_ut_common.cpp | 3 ++- ydb/core/persqueue/ut/common/pq_ut_common.h | 1 - ydb/core/persqueue/ut/ya.make | 1 - 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 360c2641e63d..05a8762b6b71 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -457,6 +457,7 @@ void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor } if (WaitForDeleteAndRename) { + // The tablet deleted and renamed the blobs PoisonPill(ctx); return; } @@ -502,6 +503,7 @@ enum EKeyPosition { LhsContainsRhs }; +// Calculates the location of keys relative to each other static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs) { Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(), @@ -522,6 +524,7 @@ static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs) } // case lhs.GetOffset() < rhs.GetOffset() + if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) { return LhsContainsRhs; } else if (nextOffset == rhs.GetOffset()) { @@ -548,29 +551,32 @@ static THashSet FilterBlobsMetaData(const NKikimrClient::TKeyValueRespo TDeque filtered; TKey lastKey; - for (size_t i = 0; i < keys.size(); ++i) { + for (auto& key : keys) { if (filtered.empty()) { - filtered.push_back(keys[i]); + filtered.push_back(std::move(key)); lastKey = MakeKeyFromString(filtered.back(), partitionId); continue; } - auto candidate = MakeKeyFromString(keys[i], partitionId); + auto candidate = MakeKeyFromString(key, partitionId); switch (KeyPosition(lastKey, candidate)) { case RhsContainsLhs: - filtered.back() = keys[i]; + // We found a key that is wider than the previous key + filtered.back() = std::move(key); lastKey = MakeKeyFromString(filtered.back(), partitionId); break; case RhsAfterLhs: - filtered.push_back(keys[i]); + // The new key is adjacent to the previous key. + filtered.push_back(std::move(key)); lastKey = MakeKeyFromString(filtered.back(), partitionId); break; case LhsContainsRhs: + // The current key already contains this key break; default: Y_ABORT("A strange key %s, last key %s", - keys[i].data(), filtered.back().data()); + key.data(), filtered.back().data()); } } @@ -606,10 +612,9 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons auto& gapSize = Partition()->GapSize; auto& bodySize = Partition()->BodySize; - Y_ABORT_UNLESS(!CompatibilityRequest); - const auto actualKeys = FilterBlobsMetaData(range, PartitionId()); const TString firstHeadKey = FindFirstHeadKey(actualKeys); + CompatibilityRequest = MakeHolder(); for (ui32 i = 0; i < range.PairSize(); ++i) { @@ -617,6 +622,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here if (!actualKeys.contains(pair.GetKey())) { + // It is necessary to remove the extra blob auto* cmd = CompatibilityRequest->Record.AddCmdDeleteRange(); auto* range = cmd->MutableRange(); range->SetFrom(pair.GetKey()); @@ -626,6 +632,8 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons continue; } if (pair.GetKey().back() == '?') { + // We need to rename the new keys. At the same time, the location relative to the "head" must be + // taken into account TString newKey = pair.GetKey(); if (newKey < firstHeadKey) { newKey.resize(newKey.size() - 1); @@ -669,6 +677,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons } if ((CompatibilityRequest->Record.CmdDeleteRangeSize() == 0) && (CompatibilityRequest->Record.CmdRenameSize() == 0)) { + // All the keys are correct. We don't need to delete or rename anything CompatibilityRequest = nullptr; } diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h index b6b755033010..b0eb8688c38a 100644 --- a/ydb/core/persqueue/partition_init.h +++ b/ydb/core/persqueue/partition_init.h @@ -143,6 +143,7 @@ class TInitDataRangeStep: public TBaseKVStep { void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); void FormHeadAndProceed(); + // request to delete and rename keys from the new version THolder CompatibilityRequest; bool WaitForDeleteAndRename = false; }; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 0607ff3e23e9..9255c5d995c3 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -139,6 +139,8 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, PQTabletPrepare(parameters, users, *context.Runtime, context.TabletId, context.Edge); } +// Allows you to create a topic with the required set of keys. The keys and values are taken from the resource file. +// The values are expected to be BZIP2 compressed and BASE64 encoded. void PQTabletPrepareFromResource(const TTabletPreparationParameters& parameters, const TVector>& users, const TString& resourceName, @@ -354,7 +356,6 @@ void PQTabletRestart(TTestActorRuntime& runtime, ui64 tabletId, TActorId edge) { rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2)); runtime.DispatchEvents(rebootOptions); } - TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) { return SetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force); } diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index c54f9c8a20df..94dda0f32a23 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -262,7 +262,6 @@ struct TTabletPreparationParameters { TString account{"federationAccount"}; ::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY; }; - void PQTabletPrepare( const TTabletPreparationParameters& parameters, const TVector>& users, diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index 7237ea9b7149..3f70fd50f5f6 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -17,7 +17,6 @@ PEERDIR( library/cpp/getopt library/cpp/regex/pcre library/cpp/svnversion - library/cpp/resource ydb/core/persqueue/ut/common ydb/core/testlib/default ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils