Skip to content

Commit

Permalink
[+] comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Feb 14, 2025
1 parent 9fd96b5 commit e69d5d3
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
25 changes: 17 additions & 8 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
}

if (WaitForDeleteAndRename) {
// The tablet deleted and renamed the blobs
PoisonPill(ctx);
return;
}
Expand Down Expand Up @@ -502,6 +503,7 @@ enum EKeyPosition {
LhsContainsRhs
};

// Calculates the location of keys relative to each other
static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
{
Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(),
Expand All @@ -522,6 +524,7 @@ static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
}

// case lhs.GetOffset() < rhs.GetOffset()

if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) {
return LhsContainsRhs;
} else if (nextOffset == rhs.GetOffset()) {
Expand All @@ -548,29 +551,32 @@ static THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueRespo
TDeque<TString> filtered;
TKey lastKey;

for (size_t i = 0; i < keys.size(); ++i) {
for (auto& key : keys) {
if (filtered.empty()) {
filtered.push_back(keys[i]);
filtered.push_back(std::move(key));
lastKey = MakeKeyFromString(filtered.back(), partitionId);
continue;
}

auto candidate = MakeKeyFromString(keys[i], partitionId);
auto candidate = MakeKeyFromString(key, partitionId);

switch (KeyPosition(lastKey, candidate)) {
case RhsContainsLhs:
filtered.back() = keys[i];
// We found a key that is wider than the previous key
filtered.back() = std::move(key);
lastKey = MakeKeyFromString(filtered.back(), partitionId);
break;
case RhsAfterLhs:
filtered.push_back(keys[i]);
// The new key is adjacent to the previous key.
filtered.push_back(std::move(key));
lastKey = MakeKeyFromString(filtered.back(), partitionId);
break;
case LhsContainsRhs:
// The current key already contains this key
break;
default:
Y_ABORT("A strange key %s, last key %s",
keys[i].data(), filtered.back().data());
key.data(), filtered.back().data());
}
}

Expand Down Expand Up @@ -606,17 +612,17 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
auto& gapSize = Partition()->GapSize;
auto& bodySize = Partition()->BodySize;

Y_ABORT_UNLESS(!CompatibilityRequest);

const auto actualKeys = FilterBlobsMetaData(range, PartitionId());
const TString firstHeadKey = FindFirstHeadKey(actualKeys);

CompatibilityRequest = MakeHolder<TEvKeyValue::TEvRequest>();

for (ui32 i = 0; i < range.PairSize(); ++i) {
const auto& pair = range.GetPair(i);
Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here

if (!actualKeys.contains(pair.GetKey())) {
// It is necessary to remove the extra blob
auto* cmd = CompatibilityRequest->Record.AddCmdDeleteRange();
auto* range = cmd->MutableRange();
range->SetFrom(pair.GetKey());
Expand All @@ -626,6 +632,8 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
continue;
}
if (pair.GetKey().back() == '?') {
// We need to rename the new keys. At the same time, the location relative to the "head" must be
// taken into account
TString newKey = pair.GetKey();
if (newKey < firstHeadKey) {
newKey.resize(newKey.size() - 1);
Expand Down Expand Up @@ -669,6 +677,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
}

if ((CompatibilityRequest->Record.CmdDeleteRangeSize() == 0) && (CompatibilityRequest->Record.CmdRenameSize() == 0)) {
// All the keys are correct. We don't need to delete or rename anything
CompatibilityRequest = nullptr;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class TInitDataRangeStep: public TBaseKVStep {
void FillBlobsMetaData(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
void FormHeadAndProceed();

// request to delete and rename keys from the new version
THolder<TEvKeyValue::TEvRequest> CompatibilityRequest;
bool WaitForDeleteAndRename = false;
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
PQTabletPrepare(parameters, users, *context.Runtime, context.TabletId, context.Edge);
}

// Allows you to create a topic with the required set of keys. The keys and values are taken from the resource file.
// The values are expected to be BZIP2 compressed and BASE64 encoded.
void PQTabletPrepareFromResource(const TTabletPreparationParameters& parameters,
const TVector<std::pair<TString, bool>>& users,
const TString& resourceName,
Expand Down Expand Up @@ -354,7 +356,6 @@ void PQTabletRestart(TTestActorRuntime& runtime, ui64 tabletId, TActorId edge) {
rebootOptions.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvRestored, 2));
runtime.DispatchEvents(rebootOptions);
}

TActorId SetOwner(const ui32 partition, TTestContext& tc, const TString& owner, bool force) {
return SetOwner(tc.Runtime.Get(), tc.TabletId, tc.Edge, partition, owner, force);
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ struct TTabletPreparationParameters {
TString account{"federationAccount"};
::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY;
};

void PQTabletPrepare(
const TTabletPreparationParameters& parameters,
const TVector<std::pair<TString, bool>>& users,
Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ PEERDIR(
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
library/cpp/resource
ydb/core/persqueue/ut/common
ydb/core/testlib/default
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
Expand Down

0 comments on commit e69d5d3

Please sign in to comment.