Skip to content

Commit

Permalink
use policies for memory prediction on compaction (#1130)
Browse files Browse the repository at this point in the history
* use raw bytes for optimizer limiter

* use policies for memory prediction on compaction

* fix build

* fixes

* fix build

* fix

* fix
  • Loading branch information
ivanmorozov333 authored Jan 19, 2024
1 parent 462ee71 commit 6457070
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,12 @@ void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSo
AFL_VERIFY(CheckPoints.emplace(position, include).second || !validationDuplications);
}

std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() {
if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) {
return std::make_shared<TMemoryPredictorChunkedPolicy>();
} else {
return std::make_shared<TMemoryPredictorSimplePolicy>();
}
}

}
49 changes: 48 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,62 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
virtual ui64 DoCalcMemoryForUsage() const override {
auto predictor = BuildMemoryPredictor();
ui64 result = 0;
for (auto& p : SwitchedPortions) {
result += 2 * p.GetBlobBytes();
result = predictor->AddPortion(p);
}
return result;
}
public:
using TBase::TBase;

class IMemoryPredictor {
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0;
virtual ~IMemoryPredictor() = default;
};

class TMemoryPredictorSimplePolicy: public IMemoryPredictor {
private:
ui64 SumMemory = 0;
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override {
for (auto&& i : portionInfo.GetRecords()) {
SumMemory += i.BlobRange.Size;
SumMemory += 2 * i.GetMeta().GetRawBytesVerified();
}
return SumMemory;
}
};

class TMemoryPredictorChunkedPolicy: public IMemoryPredictor {
private:
ui64 SumMemory = 0;
ui32 PortionsCount = 0;
THashMap<ui32, ui64> MaxMemoryByColumnChunk;
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override {
SumMemory += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16));
for (auto&& i : portionInfo.GetRecords()) {
SumMemory += i.BlobRange.Size;
auto it = MaxMemoryByColumnChunk.find(i.GetColumnId());
++PortionsCount;
if (it == MaxMemoryByColumnChunk.end()) {
it = MaxMemoryByColumnChunk.emplace(i.GetColumnId(), i.GetMeta().GetRawBytesVerified()).first;
SumMemory += it->second * PortionsCount;
} else if (it->second < i.GetMeta().GetRawBytesVerified()) {
SumMemory -= it->second * (PortionsCount - 1);
it->second = i.GetMeta().GetRawBytesVerified();
SumMemory += it->second * PortionsCount;
}
}
return SumMemory;
}
};

static std::shared_ptr<IMemoryPredictor> BuildMemoryPredictor();

void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include = true, const bool validationDuplications = true);

virtual TString TypeString() const override {
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class TPortionInfo {
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
ui64 DeprecatedGranuleId = 0;
public:
std::vector<TColumnRecord> Records;

const std::vector<TColumnRecord>& GetRecords() const {
return Records;
}

ui64 GetPathId() const {
return PathId;
}
Expand Down Expand Up @@ -117,8 +123,6 @@ class TPortionInfo {
return Meta;
}

std::vector<TColumnRecord> Records;

const TColumnRecord* GetRecordPointer(const TChunkAddress& address) const {
for (auto&& i : Records) {
if (i.GetAddress() == address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,12 @@ class TPortionsPool {
std::sort(sorted.begin(), sorted.end(), pred);

std::vector<std::shared_ptr<TPortionInfo>> result;
ui64 currentSize = 0;
std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges::IMemoryPredictor> predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor();
for (auto&& i : sorted) {
if (currentSize > sizeLimit && result.size() > 1) {
result.emplace_back(i);
if (predictor->AddPortion(*i) > sizeLimit && result.size() > 1) {
break;
}
result.emplace_back(i);
currentSize += i->GetBlobBytes();
}
if (result.size() < sorted.size()) {
separatePoint = sorted[result.size()]->IndexKeyStart();
Expand Down

0 comments on commit 6457070

Please sign in to comment.