Skip to content

Commit

Permalink
fix sys view chunks reply construction (#12787)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 19, 2024
1 parent 8596497 commit c66a319
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ class TStatsIteratorBase: public TScanIteratorBase {
virtual TConclusion<std::shared_ptr<TPartialReadResult>> GetBatch() override {
while (!Finished()) {
if (!IsReadyForBatch()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "batch_not_ready");
return std::shared_ptr<TPartialReadResult>();
}
auto batchOpt = ExtractStatsBatch();
if (!batchOpt) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "no_batch_on_finished");
AFL_VERIFY(Finished());
return std::shared_ptr<TPartialReadResult>();
}
Expand Down Expand Up @@ -70,6 +72,7 @@ class TStatsIteratorBase: public TScanIteratorBase {
auto table = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({resultBatch}));
return std::make_shared<TPartialReadResult>(table, std::make_shared<TPlainScanCursor>(lastKey), Context, std::nullopt);
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "finished_iterator");
return std::shared_ptr<TPartialReadResult>();
}

Expand All @@ -89,10 +92,7 @@ class TStatsIteratorBase: public TScanIteratorBase {
AFL_VERIFY(*count == i->length());
}
}
auto result = arrow::RecordBatch::Make(DataSchema, columns.front()->length(), columns);
if (result->num_rows()) {
return result;
}
return arrow::RecordBatch::Make(DataSchema, columns.front()->length(), columns);
}
return std::nullopt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ ui32 TStatsIterator::PredictRecordsCount(const NAbstract::TGranuleMetaView& gran
break;
}
}
AFL_VERIFY(recordsCount);
AFL_VERIFY(recordsCount || granule.GetPortions().empty());
return recordsCount;
}

Expand All @@ -189,6 +189,22 @@ TConclusionStatus TStatsIterator::Start() {
return TConclusionStatus::Success();
}

bool TStatsIterator::IsReadyForBatch() const {
if (!IndexGranules.size()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "batch_ready_check")("result", false)("reason", "no_granules");
return false;
}
if (!IndexGranules.front().GetPortions().size()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "batch_ready_check")("result", true)("reason", "no_granule_portions");
return true;
}
if (FetchedAccessors.contains(IndexGranules.front().GetPortions().front()->GetPortionId())) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "batch_ready_check")("result", true)("reason", "portion_fetched");
return true;
}
return false;
}

TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
const std::shared_ptr<TDataAccessorsRequest>& request, const ui64 mem, const std::shared_ptr<NReader::TReadContext>& context)
: TBase(mem)
Expand All @@ -200,6 +216,7 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
}

void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
Request = nullptr;
Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema

using TBase = NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexStats>;

virtual bool IsReadyForBatch() const override {
return IndexGranules.size() && IndexGranules.front().GetPortions().size() &&
FetchedAccessors.contains(IndexGranules.front().GetPortions().front()->GetPortionId());
}

virtual bool IsReadyForBatch() const override;
virtual bool AppendStats(
const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const override;
virtual ui32 PredictRecordsCount(const NAbstract::TGranuleMetaView& granule) const override;
Expand Down

0 comments on commit c66a319

Please sign in to comment.