Skip to content

Commit

Permalink
Repair portions and async proto parser (ydb-platform#11636)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 8, 2025
1 parent d8bd6b9 commit de3c519
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 96 deletions.
75 changes: 69 additions & 6 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,12 +1256,72 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::T
}
};

class TPortionConstructorV2 {
private:
NOlap::TPortionInfo::TConstPtr PortionInfo;
std::optional<NOlap::TColumnChunkLoadContextV2> Records;
std::optional<std::vector<NOlap::TIndexChunkLoadContext>> Indexes;

public:
TPortionConstructorV2(const NOlap::TPortionInfo::TConstPtr& portionInfo)
: PortionInfo(portionInfo) {
}

void SetRecords(NOlap::TColumnChunkLoadContextV2&& records) {
AFL_VERIFY(!Records);
Records = std::move(records);
}

void SetIndexes(std::vector<NOlap::TIndexChunkLoadContext>&& indexes) {
AFL_VERIFY(!Indexes);
Indexes = std::move(indexes);
}

NOlap::TPortionDataAccessor BuildAccessor() {
AFL_VERIFY(PortionInfo && Records && Indexes);
std::vector<NOlap::TColumnChunkLoadContextV1> records = Records->BuildRecordsV1();
return NOlap::TPortionAccessorConstructor::BuildForLoading(std::move(PortionInfo), std::move(records), std::move(*Indexes));
}
};

class TAccessorsParsingTask: public NConveyor::ITask {
private:
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
std::vector<TPortionConstructorV2> Portions;

virtual TConclusionStatus DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override {
std::vector<NOlap::TPortionDataAccessor> accessors;
accessors.reserve(Portions.size());
for (auto&& i : Portions) {
accessors.emplace_back(i.BuildAccessor());
}
FetchCallback->OnAccessorsFetched(std::move(accessors));
return TConclusionStatus::Success();
}
virtual void DoOnCannotExecute(const TString& reason) override {
AFL_VERIFY(false)("cannot parse metadata", reason);
}

public:
virtual TString GetTaskClassIdentifier() const override {
return "ASKED_METADATA_PARSER";
}

TAccessorsParsingTask(
const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions)
: FetchCallback(callback)
, Portions(std::move(portions))
{

}
};

class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
private:
using TBase = TTransactionBase<TColumnShard>;
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
std::vector<NOlap::TPortionDataAccessor> FetchedAccessors;
std::vector<TPortionConstructorV2> FetchedAccessors;

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
Expand All @@ -1275,6 +1335,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
NIceDb::TNiceDb db(txc.DB);

TBlobGroupSelector selector(Self->Info());
bool reask = false;
for (auto&& i : PortionsByPath) {
Expand Down Expand Up @@ -1302,21 +1363,22 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
std::vector<NOlap::TColumnChunkLoadContextV1> records;
std::vector<NOlap::TIndexChunkLoadContext> indexes;
TPortionConstructorV2 constructor(p);
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
NOlap::TColumnChunkLoadContextV2 info(rowset);
constructor.SetRecords(std::move(info));
if (!rowset.Next()) {
return false;
}
}
}
{
std::vector<NOlap::TIndexChunkLoadContext> indexes;
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -1327,16 +1389,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
return false;
}
}
constructor.SetIndexes(std::move(indexes));
}
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
FetchedAccessors.emplace_back(std::move(constructor));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
"path_id", i.first);
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
return true;
}
void Complete(const TActorContext& /*ctx*/) override {
Expand Down
45 changes: 31 additions & 14 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,10 @@ class TColumnChunkLoadContext {
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());

public:
TPortionAddress GetPortionAddress() const {
return TPortionAddress(PathId, PortionId);
}

const TChunkAddress& GetAddress() const {
return Address;
}
Expand Down Expand Up @@ -1012,20 +1016,6 @@ class TColumnChunkLoadContextV1 {
return TPortionAddress(PathId, PortionId);
}

template <class TSource>
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
}

NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
NKikimrTxColumnShard::TColumnChunkInfo proto;
proto.SetSSColumnId(Address.GetColumnId());
Expand Down Expand Up @@ -1068,6 +1058,33 @@ class TColumnChunkLoadContextV1 {
}
};

class TColumnChunkLoadContextV2 {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(TString, MetadataProto);

public:
template <class TSource>
TColumnChunkLoadContextV2(const TSource& rowset) {
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
MetadataProto = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
}

std::vector<TColumnChunkLoadContextV1> BuildRecordsV1() const {
std::vector<TColumnChunkLoadContextV1> records;
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
AFL_VERIFY(metaProto.ParseFromArray(MetadataProto.data(), MetadataProto.size()))("event", "cannot parse metadata as protobuf");
for (auto&& i : metaProto.GetChunks()) {
TColumnChunkLoadContextV1 result(PathId, PortionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
records.emplace_back(std::move(result));
}
return records;
}
};

class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {

void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
db.Table<NColumnShard::Schema::IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
}

void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
TablesCleaner,
DeprecatedPortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,
DeprecatedEmptyPortionsCleaner,
CleanInsertionDedup,
GCCountersNormalizer,
RestorePortionFromChunks,
Expand Down
Loading

0 comments on commit de3c519

Please sign in to comment.