Skip to content

Commit

Permalink
correct portions index usage for control memory scan intervals (#8135)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 22, 2024
1 parent 60183c7 commit bb1c803
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 44 deletions.
24 changes: 11 additions & 13 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,20 +510,18 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}

for (const auto& [indexKey, keyPortions] : spg->GetPortionsIndex().GetPoints()) {
for (auto&& [_, portionInfo] : keyPortions.GetStart()) {
if (!portionInfo->IsVisible(snapshot)) {
continue;
}
Y_ABORT_UNLESS(portionInfo->Produced());
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")
("pathId", pathId)("portion", portionInfo->DebugString());
if (skipPortion) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(snapshot)) {
continue;
}
Y_ABORT_UNLESS(portionInfo->Produced());
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
"portion", portionInfo->DebugString());
if (skipPortion) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
}

return out;
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {

void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) {
if (portionAfter) {
PortionsIndex.AddPortion(portionAfter);

PortionInfoGuard.OnNewPortion(portionAfter);
if (!portionAfter->HasRemoveSnapshot()) {
PortionsIndex.AddPortion(portionAfter);
if (modificationGuard) {
modificationGuard->AddPortion(portionAfter);
} else {
Expand All @@ -74,10 +73,9 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port

void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore) {
if (portionBefore) {
PortionsIndex.RemovePortion(portionBefore);

PortionInfoGuard.OnDropPortion(portionBefore);
if (!portionBefore->HasRemoveSnapshot()) {
PortionsIndex.RemovePortion(portionBefore);
OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore);
ActualizationIndex->RemovePortion(portionBefore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
auto itTo = Points.find(p->IndexKeyEnd());
AFL_VERIFY(itTo != Points.end());
{
const TPortionInfoStat stat(p->GetMinMemoryForReadColumns({}), p->GetTotalBlobBytes());
const TPortionInfoStat stat(p);
auto it = itFrom;
while (true) {
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
it->second.RemoveContained(p->GetPortionId(), stat);
it->second.RemoveContained(stat);
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
if (it == itTo) {
Expand Down Expand Up @@ -98,10 +98,10 @@ void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
itTo->second.AddFinish(p);

auto it = itFrom;
const TPortionInfoStat stat(p->GetMinMemoryForReadColumns({}), p->GetTotalBlobBytes());
const TPortionInfoStat stat(p);
while (true) {
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
it->second.AddContained(p->GetPortionId(), stat);
it->second.AddContained(stat);
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
if (it == itTo) {
Expand Down
60 changes: 37 additions & 23 deletions ydb/core/tx/columnshard/engines/storage/granule/portions_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,41 @@ namespace NKikimr::NOlap::NGranule::NPortionsIndex {

class TPortionInfoStat {
private:
std::shared_ptr<TPortionInfo> PortionInfo;
YDB_READONLY(ui64, MinRawBytes, 0);
YDB_READONLY(ui64, BlobBytes, 0);

public:
TPortionInfoStat() = default;

TPortionInfoStat(const ui64 rawBytes, const ui64 blobBytes)
: MinRawBytes(rawBytes)
, BlobBytes(blobBytes)
TPortionInfoStat(const std::shared_ptr<TPortionInfo>& portionInfo)
: PortionInfo(portionInfo)
, MinRawBytes(PortionInfo->GetMinMemoryForReadColumns({}))
, BlobBytes(PortionInfo->GetTotalBlobBytes())
{

}

const TPortionInfo& GetPortionInfoVerified() const {
AFL_VERIFY(PortionInfo);
return *PortionInfo;
}
};

class TIntervalInfoStat {
private:
YDB_READONLY(ui64, MinRawBytes, 0);
YDB_READONLY(ui64, BlobBytes, 0);

public:
void Add(const TPortionInfoStat& source) {
MinRawBytes += source.MinRawBytes;
BlobBytes += source.BlobBytes;
MinRawBytes += source.GetMinRawBytes();
BlobBytes += source.GetBlobBytes();
}

void Sub(const TPortionInfoStat& source) {
AFL_VERIFY(MinRawBytes >= source.MinRawBytes);
MinRawBytes -= source.MinRawBytes;
AFL_VERIFY(BlobBytes >= source.BlobBytes);
BlobBytes -= source.BlobBytes;
AFL_VERIFY(MinRawBytes >= source.GetMinRawBytes());
MinRawBytes -= source.GetMinRawBytes();
AFL_VERIFY(BlobBytes >= source.GetBlobBytes());
BlobBytes -= source.GetBlobBytes();
AFL_VERIFY(!!BlobBytes == !!MinRawBytes);
}

Expand All @@ -46,20 +58,20 @@ class TPortionsPKPoint {
THashMap<ui64, std::shared_ptr<TPortionInfo>> Start;
THashMap<ui64, std::shared_ptr<TPortionInfo>> Finish;
THashMap<ui64, TPortionInfoStat> PortionIds;
YDB_READONLY_DEF(TPortionInfoStat, IntervalStats);
YDB_READONLY_DEF(TIntervalInfoStat, IntervalStats);

public:
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetStart() const {
return Start;
}

void ProvidePortions(const TPortionsPKPoint& source) {
IntervalStats = TPortionInfoStat();
IntervalStats = TIntervalInfoStat();
for (auto&& [i, stat] : source.PortionIds) {
if (source.Finish.contains(i)) {
continue;
}
AddContained(i, stat);
AddContained(stat);
}
}

Expand All @@ -71,17 +83,19 @@ class TPortionsPKPoint {
return Start.empty() && Finish.empty();
}

void AddContained(const ui32 portionId, const TPortionInfoStat& stat) {
IntervalStats.Add(stat);
AFL_VERIFY(PortionIds.emplace(portionId, stat).second);
void AddContained(const TPortionInfoStat& stat) {
if (!stat.GetPortionInfoVerified().HasRemoveSnapshot()) {
IntervalStats.Add(stat);
}
AFL_VERIFY(PortionIds.emplace(stat.GetPortionInfoVerified().GetPortionId(), stat).second);
}

void RemoveContained(const ui32 portionId, const TPortionInfoStat& stat) {
IntervalStats.Sub(stat);
AFL_VERIFY(PortionIds.erase(portionId));
if (PortionIds.empty()) {
AFL_VERIFY(!IntervalStats);
void RemoveContained(const TPortionInfoStat& stat) {
if (!stat.GetPortionInfoVerified().HasRemoveSnapshot()) {
IntervalStats.Sub(stat);
}
AFL_VERIFY(PortionIds.erase(stat.GetPortionInfoVerified().GetPortionId()));
AFL_VERIFY(PortionIds.size() || !IntervalStats);
}

void RemoveStart(const std::shared_ptr<TPortionInfo>& p) {
Expand Down Expand Up @@ -162,7 +176,7 @@ class TPortionsIndex {
return it;
}

void RemoveFromMemoryUsageControl(const TPortionInfoStat& stat) {
void RemoveFromMemoryUsageControl(const TIntervalInfoStat& stat) {
RawMemoryUsage.Remove(stat.GetMinRawBytes());
BlobMemoryUsage.Remove(stat.GetBlobBytes());
}
Expand Down

0 comments on commit bb1c803

Please sign in to comment.