Skip to content

Commit

Permalink
fix tx ask hard processing (#12648)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 16, 2024
1 parent 01126ac commit a64b1fd
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,18 @@ class TPortionConstructorV2 {
: PortionInfo(portionInfo) {
}

bool IsReady() const {
return HasRecords() && HasIndexes();
}

bool HasRecords() const {
return !!Records;
}

bool HasIndexes() const {
return !!Indexes;
}

void SetRecords(NOlap::TColumnChunkLoadContextV2&& records) {
AFL_VERIFY(!Records);
Records = std::move(records);
Expand Down Expand Up @@ -1406,6 +1418,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
std::vector<TPortionConstructorV2> FetchedAccessors;
const TString Consumer;
THashMap<NOlap::TPortionAddress, TPortionConstructorV2> Constructors;

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
Expand All @@ -1428,13 +1441,43 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first);
for (auto&& p : i.second) {
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
auto itPortionConstructor = Constructors.find(p->GetAddress());
if (itPortionConstructor == Constructors.end()) {
TPortionConstructorV2 constructor(p);
itPortionConstructor = Constructors.emplace(p->GetAddress(), std::move(constructor)).first;
} else if (itPortionConstructor->second.IsReady()) {
continue;
}
{
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!itPortionConstructor->second.HasRecords()) {
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
} else {
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())(
"debug", p->DebugString(true));
NOlap::TColumnChunkLoadContextV2 info(rowset);
itPortionConstructor->second.SetRecords(std::move(info));
}
}
if (!itPortionConstructor->second.HasIndexes()) {
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
itPortionConstructor->second.SetIndexes({});
} else {
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
reask = true;
} else {
std::vector<NOlap::TIndexChunkLoadContext> indexes;
bool localReask = false;
while (!localReask && !rowset.EndOfSet()) {
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
if (!rowset.Next()) {
reask = true;
localReask = true;
}
}
itPortionConstructor->second.SetIndexes(std::move(indexes));
}
}
}
}
Expand All @@ -1443,40 +1486,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
return false;
}

for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())(
"path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
TPortionConstructorV2 constructor(p);
{
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
AFL_VERIFY(!rowset.EndOfSet())("path_id", p->GetPathId())("portion_id", p->GetPortionId())("debug", p->DebugString(true));
NOlap::TColumnChunkLoadContextV2 info(rowset);
constructor.SetRecords(std::move(info));
}
std::vector<NOlap::TIndexChunkLoadContext> indexes;
if (p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
indexes.emplace_back(NOlap::TIndexChunkLoadContext(rowset, &selector));
if (!rowset.Next()) {
return false;
}
}
}
constructor.SetIndexes(std::move(indexes));
FetchedAccessors.emplace_back(std::move(constructor));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())(
"path_id", i.first);
for (auto&& i : Constructors) {
FetchedAccessors.emplace_back(std::move(i.second));
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
Expand Down

0 comments on commit a64b1fd

Please sign in to comment.