diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 5d866ae4e32e..ff67fcbba702 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -266,4 +266,12 @@ void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSo AFL_VERIFY(CheckPoints.emplace(position, include).second || !validationDuplications); } +std::shared_ptr TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() { + if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) { + return std::make_shared(); + } else { + return std::make_shared(); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 811eab38aa60..506966ac3494 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -19,15 +19,62 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; virtual ui64 DoCalcMemoryForUsage() const override { + auto predictor = BuildMemoryPredictor(); ui64 result = 0; for (auto& p : SwitchedPortions) { - result += 2 * p.GetBlobBytes(); + result = predictor->AddPortion(p); } return result; } public: using TBase::TBase; + class IMemoryPredictor { + public: + virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0; + virtual ~IMemoryPredictor() = default; + }; + + class TMemoryPredictorSimplePolicy: public IMemoryPredictor { + private: + ui64 SumMemory = 0; + public: + virtual ui64 AddPortion(const TPortionInfo& portionInfo) override { + for (auto&& i : portionInfo.GetRecords()) { + SumMemory += i.BlobRange.Size; + SumMemory += 2 * i.GetMeta().GetRawBytesVerified(); + } + return SumMemory; + } + }; + + class TMemoryPredictorChunkedPolicy: public IMemoryPredictor { + private: + ui64 SumMemory = 0; + ui32 PortionsCount = 0; + THashMap MaxMemoryByColumnChunk; + public: + virtual ui64 AddPortion(const TPortionInfo& portionInfo) override { + SumMemory += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)); + for (auto&& i : portionInfo.GetRecords()) { + SumMemory += i.BlobRange.Size; + auto it = MaxMemoryByColumnChunk.find(i.GetColumnId()); + ++PortionsCount; + if (it == MaxMemoryByColumnChunk.end()) { + it = MaxMemoryByColumnChunk.emplace(i.GetColumnId(), i.GetMeta().GetRawBytesVerified()).first; + SumMemory += it->second * PortionsCount; + } else if (it->second < i.GetMeta().GetRawBytesVerified()) { + SumMemory -= it->second * (PortionsCount - 1); + it->second = i.GetMeta().GetRawBytesVerified(); + SumMemory += it->second * PortionsCount; + } + } + return SumMemory; + } + }; + + static std::shared_ptr BuildMemoryPredictor(); + void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include = true, const bool validationDuplications = true); virtual TString TypeString() const override { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 2ba089bb80d9..b182e74dbf5a 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -25,6 +25,12 @@ class TPortionInfo { std::shared_ptr BlobsOperator; ui64 DeprecatedGranuleId = 0; public: + std::vector Records; + + const std::vector& GetRecords() const { + return Records; + } + ui64 GetPathId() const { return PathId; } @@ -117,8 +123,6 @@ class TPortionInfo { return Meta; } - std::vector Records; - const TColumnRecord* GetRecordPointer(const TChunkAddress& address) const { for (auto&& i : Records) { if (i.GetAddress() == address) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index 8ebfc8681af8..531279ccd244 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -360,13 +360,12 @@ class TPortionsPool { std::sort(sorted.begin(), sorted.end(), pred); std::vector> result; - ui64 currentSize = 0; + std::shared_ptr predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor(); for (auto&& i : sorted) { - if (currentSize > sizeLimit && result.size() > 1) { + result.emplace_back(i); + if (predictor->AddPortion(*i) > sizeLimit && result.size() > 1) { break; } - result.emplace_back(i); - currentSize += i->GetBlobBytes(); } if (result.size() < sorted.size()) { separatePoint = sorted[result.size()]->IndexKeyStart();