Skip to content

Commit

Permalink
fix normalizers for v1 migration chunks (#11308)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 6, 2024
1 parent dbd5f95 commit 6c24540
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)

ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", changes->DebugString());
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
Y_ABORT_UNLESS(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= snapshot);
AFL_VERIFY(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= Self->GetLastTxSnapshot())
("schema_last", Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot().DebugString())(
"planned_last", Self->GetLastTxSnapshot().DebugString());

TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, snapshot));
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, Self->GetLastTxSnapshot()));
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>(), CurrentSnapshot);
changes->WriteIndexOnExecute(Self, context);
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -918,13 +918,19 @@ class TPortionLoadContext {
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);
YDB_READONLY_DEF(std::optional<NOlap::TSnapshot>, DeprecatedMinSnapshot);

public:
template <class TSource>
TPortionLoadContext(const TSource& rowset) {
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
AFL_VERIFY(rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>() == rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotTxId>());
if (rowset.template HaveValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>()) {
DeprecatedMinSnapshot = NOlap::TSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexPortions::MinSnapshotPlanStep>(),
rowset.template GetValue<NColumnShard::Schema::IndexPortions::MinSnapshotTxId>());
}
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}
};
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/tx/columnshard/common/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ TString TBlobRange::GetData(const TString& blobData) const {
return blobData.substr(Offset, Size);
}

TBlobRange::TBlobRange(const TUnifiedBlobId& blobId /*= TUnifiedBlobId()*/, ui32 offset /*= 0*/, ui32 size /*= 0*/)
: BlobId(blobId)
, Offset(offset)
, Size(size) {
if (Size > 0) {
AFL_VERIFY(Offset < BlobId.BlobSize())("offset", Offset)("size", Size)("blob", BlobId.ToStringNew());
AFL_VERIFY(Offset + Size <= BlobId.BlobSize())("offset", Offset)("size", Size)("blob", BlobId.ToStringNew());
}
}

NKikimr::TConclusionStatus TBlobRangeLink16::DeserializeFromProto(const NKikimrColumnShardProto::TBlobRangeLink16& proto) {
BlobIdx = proto.GetBlobIdx();
Offset = proto.GetOffset();
Expand Down Expand Up @@ -171,8 +181,12 @@ ui16 TBlobRangeLink16::GetBlobIdxVerified() const {
return *BlobIdx;
}

NKikimr::NOlap::TBlobRange TBlobRangeLink16::RestoreRange(const TUnifiedBlobId& blobId) const {
TBlobRange TBlobRangeLink16::RestoreRange(const TUnifiedBlobId& blobId) const {
return TBlobRange(blobId, Offset, Size);
}

bool TBlobRangeLink16::CheckBlob(const TUnifiedBlobId& blobId) const {
return Offset + Size <= blobId.BlobSize();
}

} // namespace NKikimr::NOlap
12 changes: 2 additions & 10 deletions ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class TBlobRangeLink16 {
}

TBlobRange RestoreRange(const TUnifiedBlobId& blobId) const;
bool CheckBlob(const TUnifiedBlobId& blobId) const;
};

struct TBlobRange {
Expand Down Expand Up @@ -267,16 +268,7 @@ struct TBlobRange {
return Size == BlobId.BlobSize();
}

explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0)
: BlobId(blobId)
, Offset(offset)
, Size(size)
{
if (Size > 0) {
Y_ABORT_UNLESS(Offset < BlobId.BlobSize());
Y_ABORT_UNLESS(Offset + Size <= BlobId.BlobSize());
}
}
explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0);

static TBlobRange FromBlobId(const TUnifiedBlobId& blobId) {
return TBlobRange(blobId, 0, blobId.BlobSize());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex&

void TPortionInfoConstructor::LoadRecord(const TColumnChunkLoadContextV1& loadContext) {
AFL_VERIFY(loadContext.GetBlobRange().GetBlobIdxVerified() < MetaConstructor.BlobIds.size());
AFL_VERIFY(loadContext.GetBlobRange().CheckBlob(MetaConstructor.BlobIds[loadContext.GetBlobRange().GetBlobIdxVerified()]))(
"blobs", JoinSeq(",", MetaConstructor.BlobIds))("range", loadContext.GetBlobRange().ToString());
TColumnRecord rec(loadContext);
Records.push_back(std::move(rec));
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ enum class ENormalizerSequentialId: ui32 {
GCCountersNormalizer,
RestorePortionFromChunks,
SyncPortionFromChunks,
RestoreV1Chunks,
DeprecatedRestoreV1Chunks,
SyncMinSnapshotFromChunks,
DeprecatedRestoreV1Chunks_V1,
RestoreV1Chunks_V2,

MAX
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TChanges: public INormalizerChanges {
ui64 indexRawBytes = 0;
ui32 columnBlobBytes = 0;
ui32 indexBlobBytes = 0;

for (auto&& c : i.GetRecords()) {
columnRawBytes += c.GetMetaProto().GetRawBytes();
columnBlobBytes += c.GetBlobRange().GetSize();
Expand Down
48 changes: 26 additions & 22 deletions ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ class TChangesAddV1: public INormalizerChanges {
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
for (auto&& i : Patches) {
auto metaProto = i.GetPortionInfo().GetMetaProto();
metaProto.ClearBlobIds();
AFL_VERIFY(!metaProto.GetBlobIds().size());
for (auto&& b : i.GetBlobIds()) {
*metaProto.AddBlobIds() = b.SerializeBinary();
*metaProto.AddBlobIds() = b.GetLogoBlobId().AsBinaryString();
}
ui32 idx = 0;
for (auto&& b : metaProto.GetBlobIds()) {
auto logo = TLogoBlobID::FromBinary(b);
AFL_VERIFY(i.GetBlobIds()[idx++].GetLogoBlobId() == logo);
}
db.Table<IndexPortions>()
.Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId())
Expand Down Expand Up @@ -154,9 +160,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
while (!rowset.EndOfSet()) {
TPortionLoadContext portion(rowset);
existPortions0.emplace(portion.GetPortionId());
if (!portion.GetMetaProto().GetBlobIds().size()) {
AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second);
}
AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second);

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
Expand Down Expand Up @@ -190,10 +194,10 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(

while (!rowset.EndOfSet()) {
TColumnChunkLoadContextV1 chunk(rowset);
AFL_VERIFY(!portions0.contains(chunk.GetPortionId()));
if (!existPortions0.contains(chunk.GetPortionId())) {
// AFL_VERIFY(!portions0.contains(chunk.GetPortionId()));
// if (!existPortions0.contains(chunk.GetPortionId())) {
AFL_VERIFY(columns1Remove.emplace(chunk.GetFullChunkAddress(), chunk).second);
}
// }

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
Expand All @@ -207,37 +211,37 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
}

{
std::vector<TPatchItemAddV1> package;
for (auto&& [portionId, portionInfo] : portions0) {
auto it = columns0.find(portionId);
AFL_VERIFY(it != columns0.end());
package.emplace_back(portionInfo, std::move(it->second));
columns0.erase(it);
std::vector<TPatchItemRemoveV1> package;
for (auto&& [portionId, chunkInfo] : columns1Remove) {
package.emplace_back(chunkInfo);
if (package.size() == 100) {
std::vector<TPatchItemAddV1> local;
std::vector<TPatchItemRemoveV1> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(local))));
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(local))));
}
}

if (package.size() > 0) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(package))));
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(package))));
}
}

{
std::vector<TPatchItemRemoveV1> package;
for (auto&& [portionId, chunkInfo] : columns1Remove) {
package.emplace_back(chunkInfo);
std::vector<TPatchItemAddV1> package;
for (auto&& [portionId, portionInfo] : portions0) {
auto it = columns0.find(portionId);
AFL_VERIFY(it != columns0.end());
package.emplace_back(portionInfo, std::move(it->second));
columns0.erase(it);
if (package.size() == 100) {
std::vector<TPatchItemRemoveV1> local;
std::vector<TPatchItemAddV1> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(local))));
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(local))));
}
}

if (package.size() > 0) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesRemoveV1>(std::move(package))));
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChangesAddV1>(std::move(package))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ namespace NKikimr::NOlap::NRestoreV1Chunks {
class TNormalizer: public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return ::ToString(ENormalizerSequentialId::RestoreV1Chunks);
return ::ToString(ENormalizerSequentialId::RestoreV1Chunks_V2);
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::RestoreV1Chunks;
return ENormalizerSequentialId::RestoreV1Chunks_V2;
}

virtual TString GetClassName() const override {
Expand Down
137 changes: 137 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include "snapshot_from_chunks.h"
#include "normalizer.h"

#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/tables_manager.h>

namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks {

class TPatchItem {
private:
TPortionLoadContext PortionInfo;
YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero());

public:
const TPortionLoadContext& GetPortionInfo() const {
return PortionInfo;
}

TPatchItem(TPortionLoadContext&& portion, const NOlap::TSnapshot& snapshot)
: PortionInfo(std::move(portion))
, Snapshot(snapshot) {
}
};

class TChanges: public INormalizerChanges {
private:
std::vector<TPatchItem> Patches;

public:
TChanges(std::vector<TPatchItem>&& patches)
: Patches(std::move(patches)) {
}
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (auto&& i : Patches) {
db.Table<Schema::IndexPortions>()
.Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId())
.Update(NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotPlanStep>(i.GetSnapshot().GetPlanStep()),
NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotTxId>(i.GetSnapshot().GetTxId())
);
}

return true;
}

virtual ui64 GetSize() const override {
return Patches.size();
}

};

TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);

bool ready = true;
ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
if (!ready) {
return TConclusionStatus::Fail("Not ready");
}

THashMap<ui64, TPortionLoadContext> dbPortions;
THashMap<ui64, NOlap::TSnapshot> initSnapshot;

{
auto rowset = db.Table<Schema::IndexPortions>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("Not ready");
}

while (!rowset.EndOfSet()) {
TPortionLoadContext portion(rowset);
if (!portion.GetDeprecatedMinSnapshot()) {
AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
}

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
}
}
}

{
auto rowset = db.Table<Schema::IndexColumns>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("Not ready");
}

while (!rowset.EndOfSet()) {
TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
const ui64 portionId = chunk.GetPortionId();
if (dbPortions.contains(portionId)) {
auto it = initSnapshot.find(portionId);
if (it == initSnapshot.end()) {
initSnapshot.emplace(portionId, chunk.GetMinSnapshotDeprecated());
} else {
AFL_VERIFY(it->second == chunk.GetMinSnapshotDeprecated());
}
}

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
}
}
}
AFL_VERIFY(dbPortions.size() == initSnapshot.size())("portions", dbPortions.size())("records", initSnapshot.size());

std::vector<INormalizerTask::TPtr> tasks;
if (dbPortions.empty()) {
return tasks;
}

std::vector<TPatchItem> package;

for (auto&& [portionId, portion] : dbPortions) {
auto it = initSnapshot.find(portionId);
AFL_VERIFY(it != initSnapshot.end());
package.emplace_back(std::move(portion), it->second);
if (package.size() == 100) {
std::vector<TPatchItem> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
}
}

if (package.size() > 0) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
}
return tasks;
}

} // namespace NKikimr::NOlap::NChunksActualization
Loading

0 comments on commit 6c24540

Please sign in to comment.