Skip to content

Commit

Permalink
Normalizer insert records (#9010)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 11, 2024
1 parent 19eab67 commit 9b1e630
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 37 deletions.
44 changes: 9 additions & 35 deletions ydb/core/tx/columnshard/columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,18 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
}

while (!rowset.EndOfSet()) {
EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
const ui64 planStep = rowset.GetValue<InsertTable::PlanStep>();
const ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>();
const ui64 pathId = rowset.GetValue<InsertTable::PathId>();
const TString dedupId = rowset.GetValue<InsertTable::DedupId>();
const ui64 schemaVersion = rowset.HaveValue<InsertTable::SchemaVersion>() ? rowset.GetValue<InsertTable::SchemaVersion>() : 0;
NOlap::TInsertTableRecordLoadContext constructor;
constructor.ParseFromDatabase(rowset);

TString error;
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(rowset.GetValue<InsertTable::BlobId>(), dsGroupSelector, error);
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());

NKikimrTxColumnShard::TLogicalMetadata meta;
if (auto metaStr = rowset.GetValue<InsertTable::Meta>()) {
Y_ABORT_UNLESS(meta.ParseFromString(metaStr));
}

std::optional<ui64> rangeOffset;
if (rowset.HaveValue<InsertTable::BlobRangeOffset>()) {
rangeOffset = rowset.GetValue<InsertTable::BlobRangeOffset>();
}
std::optional<ui64> rangeSize;
if (rowset.HaveValue<InsertTable::BlobRangeSize>()) {
rangeSize = rowset.GetValue<InsertTable::BlobRangeSize>();
}
AFL_VERIFY(!!rangeOffset == !!rangeSize);

auto userData = std::make_shared<NOlap::TUserData>(pathId,
NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, std::nullopt);

switch (recType) {
case EInsertTableIds::Inserted:
insertTable.AddInserted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
switch (constructor.GetRecType()) {
case Schema::EInsertTableIds::Inserted:
insertTable.AddInserted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
break;
case EInsertTableIds::Committed:
insertTable.AddCommitted(NOlap::TCommittedData(userData, planStep, writeTxId, dedupId), true);
case Schema::EInsertTableIds::Committed:
insertTable.AddCommitted(constructor.BuildCommitted(dsGroupSelector), true);
break;
case EInsertTableIds::Aborted:
insertTable.AddAborted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
case Schema::EInsertTableIds::Aborted:
insertTable.AddAborted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
break;
}
if (!rowset.Next()) {
Expand Down
122 changes: 121 additions & 1 deletion ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -977,4 +977,124 @@ class TIndexChunkLoadContext {
}
};

}
class TInsertTableRecordLoadContext {
private:
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
ui64 PathId;
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
TString BlobIdString;
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
std::optional<ui64> RangeOffset;
std::optional<ui64> RangeSize;

void Prepare(const IBlobGroupSelector* dsGroupSelector) {
AFL_VERIFY(!PreparedFlag);
PreparedFlag = true;
TString error;
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(BlobIdString, dsGroupSelector, error);
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
BlobId = blobId;

NKikimrTxColumnShard::TLogicalMetadata meta;
AFL_VERIFY(MetadataString);
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
Metadata = std::move(meta);
AFL_VERIFY(!!RangeOffset == !!RangeSize);
}

bool PreparedFlag = false;
bool ParsedFlag = false;

public:
TInsertWriteId GetInsertWriteId() const {
AFL_VERIFY(ParsedFlag);
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
return (TInsertWriteId)WriteTxId;
}

NColumnShard::Schema::EInsertTableIds GetRecType() const {
AFL_VERIFY(ParsedFlag);
return RecType;
}

ui64 GetPlanStep() const {
AFL_VERIFY(ParsedFlag);
return PlanStep;
}

void Remove(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
db.Table<NColumnShard::Schema::InsertTable>().Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId).Delete();
}

void Upsert(NIceDb::TNiceDb& db) const {
AFL_VERIFY(ParsedFlag);
using namespace NColumnShard;
if (RangeOffset) {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
} else {
db.Table<Schema::InsertTable>()
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
}
}

template <class TRowset>
void ParseFromDatabase(TRowset& rowset) {
AFL_VERIFY(!ParsedFlag)("problem", "duplication parsing");
ParsedFlag = true;
using namespace NColumnShard;
RecType = (Schema::EInsertTableIds)rowset.template GetValue<Schema::InsertTable::Committed>();
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
AFL_VERIFY(WriteTxId);

PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
SchemaVersion =
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
}
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
}
}

NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!!DedupId);
AFL_VERIFY(PlanStep);
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
}

NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
Prepare(dsGroupSelector);
using namespace NColumnShard;
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
auto userData = std::make_shared<NOlap::TUserData>(PathId,
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
AFL_VERIFY(!DedupId);
AFL_VERIFY(!PlanStep);
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum class ENormalizerSequentialId: ui32 {
PortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,
CleanInsertionDedup,

MAX
};
Expand Down
151 changes: 151 additions & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/broken_insertion_dedup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#include "broken_insertion_dedup.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

namespace NKikimr::NOlap::NInsertionDedup {

class TNormalizerRemoveChanges: public INormalizerChanges {
private:
std::vector<TInsertTableRecordLoadContext> Insertions;
public:
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normalizationContext*/) const override {
NIceDb::TNiceDb db(txc.DB);
for (auto&& i : Insertions) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_aborted_record")("write_id", i.GetInsertWriteId());
i.Remove(db);
}
return true;
}
virtual void ApplyOnComplete(const TNormalizationController& /*normalizationContext*/) const override {

}

virtual ui64 GetSize() const override {
return Insertions.size();
}

TNormalizerRemoveChanges(const std::vector<TInsertTableRecordLoadContext>& insertions)
: Insertions(insertions)
{

}
};

class TNormalizerCleanDedupChanges: public INormalizerChanges {
private:
mutable std::vector<TInsertTableRecordLoadContext> Insertions;

public:
virtual bool ApplyOnExecute(
NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normalizationContext*/) const override {
NIceDb::TNiceDb db(txc.DB);
for (auto&& i : Insertions) {
AFL_VERIFY(i.GetDedupId());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "correct_record")("dedup", i.GetDedupId());
i.Remove(db);
i.SetDedupId("");
i.Upsert(db);
}
return true;
}
virtual void ApplyOnComplete(const TNormalizationController& /*normalizationContext*/) const override {
}

virtual ui64 GetSize() const override {
return Insertions.size();
}

TNormalizerCleanDedupChanges(const std::vector<TInsertTableRecordLoadContext>& insertions)
: Insertions(insertions) {
}
};


class TCollectionStates {
private:
YDB_READONLY_DEF(std::optional<TInsertTableRecordLoadContext>, Inserted);
YDB_READONLY_DEF(std::optional<TInsertTableRecordLoadContext>, Aborted);
public:
void SetInserted(const TInsertTableRecordLoadContext& context) {
AFL_VERIFY(!Inserted);
Inserted = context;
}
void SetAborted(const TInsertTableRecordLoadContext& context) {
AFL_VERIFY(!Aborted);
Aborted = context;
}
};

TConclusion<std::vector<INormalizerTask::TPtr>> TInsertionsDedupNormalizer::DoInit(
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);

using namespace NColumnShard;
auto rowset = db.Table<NColumnShard::Schema::InsertTable>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("cannot read insertion info");
}
THashMap<TInsertWriteId, TCollectionStates> insertions;
while (!rowset.EndOfSet()) {
TInsertTableRecordLoadContext constructor;
constructor.ParseFromDatabase(rowset);
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Committed) {
AFL_VERIFY(constructor.GetPlanStep());
} else {
AFL_VERIFY(!constructor.GetPlanStep());
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Aborted) {
insertions[constructor.GetInsertWriteId()].SetAborted(constructor);
} else if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Inserted) {
insertions[constructor.GetInsertWriteId()].SetInserted(constructor);
} else {
AFL_VERIFY(false);
}
}
if (!rowset.Next()) {
return TConclusionStatus::Fail("cannot read insertion info");
}
}

std::vector<INormalizerTask::TPtr> result;
std::vector<TInsertTableRecordLoadContext> toRemove;
std::vector<TInsertTableRecordLoadContext> toCleanDedup;
for (auto&& [id, i] : insertions) {
if (i.GetInserted() && i.GetAborted()) {
toRemove.emplace_back(*i.GetInserted());
if (i.GetAborted()->GetDedupId()) {
toCleanDedup.emplace_back(*i.GetAborted());
}
} else if (i.GetAborted()) {
if (i.GetAborted()->GetDedupId()) {
toCleanDedup.emplace_back(*i.GetAborted());
}
} else if (i.GetInserted()) {
if (i.GetInserted()->GetDedupId()) {
toCleanDedup.emplace_back(*i.GetInserted());
}
} else {
AFL_VERIFY(false);
}
if (toCleanDedup.size() == 1000) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerCleanDedupChanges>(toCleanDedup)));
toCleanDedup.clear();
}
if (toRemove.size() == 1000) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerRemoveChanges>(toRemove)));
toRemove.clear();
}
}
if (toCleanDedup.size()) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerCleanDedupChanges>(toCleanDedup)));
toCleanDedup.clear();
}
if (toRemove.size()) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerRemoveChanges>(toRemove)));
toRemove.clear();
}

return result;
}

} // namespace NKikimr::NOlap
35 changes: 35 additions & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/broken_insertion_dedup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap::NInsertionDedup {

class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return "CleanInsertionDedup";
}
private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());

public:
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::CleanInsertionDedup;
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/tablet/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
GLOBAL gc_counters.cpp
GLOBAL broken_txs.cpp
GLOBAL broken_insertion_dedup.cpp
)

PEERDIR(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TLockFeatures: TMoveOnly {

void SetBroken() {
SharingInfo->Broken = 1;
SharingInfo->InternalGenerationCounter = (ui64)TSysTables::TLocksTable::TLock::ESetErrors::ErrorBroken;
SharingInfo->InternalGenerationCounter = (i64)TSysTables::TLocksTable::TLock::ESetErrors::ErrorBroken;
}

bool IsBroken() const {
Expand Down

0 comments on commit 9b1e630

Please sign in to comment.