diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 436370507f60..f55106599e77 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -1,7 +1,9 @@ #include "columnshard_impl.h" +#include "common/limits.h" #include "blobs_action/transaction/tx_write.h" #include "blobs_action/transaction/tx_draft.h" #include "counters/columnshard.h" +#include "engines/column_engine_logs.h" #include "operations/batch_builder/builder.h" #include "operations/write_data.h" @@ -193,6 +195,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex return returnFail(COUNTER_WRITE_FAIL); } + const ui64 minMemoryRead = TablesManager.GetPrimaryIndexAsVerified() + .GetGranuleVerified(writeMeta.GetTableId()) + .GetPortionsIndex() + .GetMinMemoryRead(); + if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "overlimit")("reason", "read_memory")("current", minMemoryRead)( + "limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId()); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::OverlimitReadMemory); + return returnFail(COUNTER_WRITE_FAIL); + } + const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); auto arrowData = std::make_shared(snapshotSchema); if (!arrowData->ParseFromProto(record)) { diff --git a/ydb/core/tx/columnshard/common/limits.h b/ydb/core/tx/columnshard/common/limits.h index 796ffef309ac..0f27c1213d93 100644 --- a/ydb/core/tx/columnshard/common/limits.h +++ b/ydb/core/tx/columnshard/common/limits.h @@ -9,5 +9,9 @@ class TGlobalLimits { static constexpr inline ui64 InsertCompactionMemoryLimit = 1ULL << 30; static constexpr inline ui64 GeneralCompactionMemoryLimit = 3ULL << 30; static constexpr inline ui64 ScanMemoryLimit = 3ULL << 30; + + static constexpr inline ui64 DefaultRejectMemoryIntervalLimit = ScanMemoryLimit; + static constexpr inline ui64 DefaultReduceMemoryIntervalLimit = 0.8 * ScanMemoryLimit; + static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20; }; } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 87bd2c16b26b..300691ed711e 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - limits.h + limits.cpp reverse_accessor.cpp scalars.cpp snapshot.cpp diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index b08ad71355e3..35773ceb8df0 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -14,7 +14,8 @@ enum class EWriteFailReason { LongTxDuplication /* "long_tx_duplication" */, NoTable /* "no_table" */, IncorrectSchema /* "incorrect_schema" */, - Overload /* "overload" */ + Overload /* "overload" */, + OverlimitReadMemory /* "overlimit_read_memory" */ }; class TCSInitialization: public TCommonCountersOwner { diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 97a4716652c3..2dfcba2cd6ed 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -85,17 +85,32 @@ class TAgentDataClassCounters: public TCommonCountersOwner { } }; +class TPortionsIndexCounters { +public: + const std::shared_ptr MinReadBytes; + TPortionsIndexCounters(const std::shared_ptr& minReadBytes) + : MinReadBytes(minReadBytes) { + } +}; + class TGranuleDataCounters { private: const TDataClassCounters InsertedData; const TDataClassCounters CompactedData; const TDataClassCounters FullData; + const TPortionsIndexCounters PortionsIndexCounters; + public: - TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData) + const TPortionsIndexCounters& GetPortionsIndexCounters() const { + return PortionsIndexCounters; + } + + TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData, + const std::shared_ptr& minReadBytes) : InsertedData(insertedData) , CompactedData(compactedData) , FullData(fullData) - { + , PortionsIndexCounters(minReadBytes) { } void OnPortionsDataRefresh(const TBaseGranuleDataClassSummary& inserted, const TBaseGranuleDataClassSummary& compacted) const { @@ -105,20 +120,38 @@ class TGranuleDataCounters { } }; +class TPortionsIndexAgentsCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + +public: + const std::shared_ptr MinReadBytes; + + TPortionsIndexAgentsCounters(const TString& baseName) + : TBase(baseName) + , MinReadBytes(TBase::GetValueAutoAggregations("MinRead/Bytes")) { + } +}; + class TAgentGranuleDataCounters { private: TAgentDataClassCounters InsertedData; TAgentDataClassCounters CompactedData; TAgentDataClassCounters FullData; + TPortionsIndexAgentsCounters PortionsIndex; + public: TAgentGranuleDataCounters(const TString& ownerId) : InsertedData(ownerId, "ByGranule/Inserted") , CompactedData(ownerId, "ByGranule/Compacted") - , FullData(ownerId, "ByGranule/Full") { + , FullData(ownerId, "ByGranule/Full") + , PortionsIndex("ByGranule/PortionsIndex") + { } TGranuleDataCounters RegisterClient() const { - return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient()); + return TGranuleDataCounters( + InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient(), PortionsIndex.MinReadBytes->GetClient()); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 5a869c5fc78e..234474695319 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -1,6 +1,7 @@ #pragma once #include "columns_set.h" #include "fetching.h" +#include #include #include #include @@ -32,9 +33,9 @@ class TSpecialReadContext { std::array, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts; public: - static const inline ui64 DefaultRejectMemoryIntervalLimit = ((ui64)3) << 30; - static const inline ui64 DefaultReduceMemoryIntervalLimit = DefaultRejectMemoryIntervalLimit; - static const inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20; + static const inline ui64 DefaultRejectMemoryIntervalLimit = TGlobalLimits::DefaultRejectMemoryIntervalLimit; + static const inline ui64 DefaultReduceMemoryIntervalLimit = TGlobalLimits::DefaultReduceMemoryIntervalLimit; + static const inline ui64 DefaultReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize; const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(DefaultReduceMemoryIntervalLimit); const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(DefaultRejectMemoryIntervalLimit); diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index d38851486e3d..5266e6738fde 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -138,8 +138,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, const TGranulesStorage& owner, con , PortionInfoGuard(owner.GetCounters().BuildPortionBlobsGuard()) , Stats(owner.GetStats()) , StoragesManager(owner.GetStoragesManager()) - , PortionsIndex(*this) -{ + , PortionsIndex(*this, Counters.GetPortionsIndexCounters()) { NStorageOptimizer::IOptimizerPlannerConstructor::TBuildContext context(PathId, owner.GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetPrimaryKey()); OptimizerPlanner = versionedIndex.GetLastSchema()->GetIndexInfo().GetCompactionPlannerConstructor()->BuildPlanner(context).DetachResult(); AFL_VERIFY(!!OptimizerPlanner); diff --git a/ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp b/ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp index 676d40ea1c48..df96e345d32d 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp @@ -11,7 +11,7 @@ TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPor TPortionIntervals portionExcludeIntervals; while (true) { std::optional nextKey; - for (auto&& p : itFrom->second.GetPortionIds()) { + for (auto&& [p, _] : itFrom->second.GetPortionIds()) { if (skipPortions.contains(p)) { continue; } @@ -55,9 +55,12 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr& p) { auto itTo = Points.find(p->IndexKeyEnd()); AFL_VERIFY(itTo != Points.end()); { + const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({}); auto it = itFrom; while (true) { - it->second.RemoveContained(p->GetPortionId()); + RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead()); + it->second.RemoveContained(p->GetPortionId(), minMemoryRead); + ++CountMemoryUsages[it->second.GetMinMemoryRead()]; if (it == itTo) { break; } @@ -67,19 +70,27 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr& p) { if (itFrom != itTo) { itFrom->second.RemoveStart(p); if (itFrom->second.IsEmpty()) { + RemoveFromMemoryUsageControl(itFrom->second.GetMinMemoryRead()); Points.erase(itFrom); } itTo->second.RemoveFinish(p); if (itTo->second.IsEmpty()) { + RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead()); Points.erase(itTo); } } else { itTo->second.RemoveStart(p); itTo->second.RemoveFinish(p); if (itTo->second.IsEmpty()) { + RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead()); Points.erase(itTo); } } + if (CountMemoryUsages.size()) { + Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first); + } else { + Counters.MinReadBytes->SetValue(0); + } } void TPortionsIndex::AddPortion(const std::shared_ptr& p) { @@ -89,13 +100,21 @@ void TPortionsIndex::AddPortion(const std::shared_ptr& p) { itTo->second.AddFinish(p); auto it = itFrom; + const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({}); while (true) { - it->second.AddContained(p->GetPortionId()); + RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead()); + it->second.AddContained(p->GetPortionId(), minMemoryRead); + ++CountMemoryUsages[it->second.GetMinMemoryRead()]; if (it == itTo) { break; } AFL_VERIFY(++it != Points.end()); } + if (CountMemoryUsages.size()) { + Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first); + } else { + Counters.MinReadBytes->SetValue(0); + } } } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/storage/granule/portions_index.h b/ydb/core/tx/columnshard/engines/storage/granule/portions_index.h index 09ca2d65e7c0..de8300ce1c35 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/portions_index.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/portions_index.h @@ -1,4 +1,5 @@ #pragma once +#include #include namespace NKikimr::NOlap { @@ -11,22 +12,25 @@ class TPortionsPKPoint { private: THashMap> Start; THashMap> Finish; - THashSet PortionIds; + THashMap PortionIds; + YDB_READONLY(ui64, MinMemoryRead, 0); + public: const THashMap>& GetStart() const { return Start; } void ProvidePortions(const TPortionsPKPoint& source) { - for (auto&& i : source.PortionIds) { + MinMemoryRead = 0; + for (auto&& [i, mem] : source.PortionIds) { if (source.Finish.contains(i)) { continue; } - AFL_VERIFY(PortionIds.emplace(i).second); + AddContained(i, mem); } } - const THashSet& GetPortionIds() const { + const THashMap& GetPortionIds() const { return PortionIds; } @@ -34,12 +38,18 @@ class TPortionsPKPoint { return Start.empty() && Finish.empty(); } - void AddContained(const ui64 portionId) { - AFL_VERIFY(PortionIds.emplace(portionId).second); + void AddContained(const ui32 portionId, const ui64 minMemoryRead) { + MinMemoryRead += minMemoryRead; + AFL_VERIFY(PortionIds.emplace(portionId, minMemoryRead).second); } - void RemoveContained(const ui64 portionId) { + void RemoveContained(const ui32 portionId, const ui64 minMemoryRead) { + AFL_VERIFY(minMemoryRead <= MinMemoryRead); + MinMemoryRead -= minMemoryRead; AFL_VERIFY(PortionIds.erase(portionId)); + if (PortionIds.empty()) { + AFL_VERIFY(!MinMemoryRead); + } } void RemoveStart(const std::shared_ptr& p) { @@ -64,7 +74,9 @@ class TPortionsPKPoint { class TPortionsIndex { private: std::map Points; + std::map CountMemoryUsages; const TGranuleMeta& Owner; + const NColumnShard::TPortionsIndexCounters& Counters; std::map::iterator InsertPoint(const NArrow::TReplaceKey& key) { auto it = Points.find(key); @@ -75,17 +87,35 @@ class TPortionsIndex { --itPred; it->second.ProvidePortions(itPred->second); } + ++CountMemoryUsages[it->second.GetMinMemoryRead()]; } return it; } + void RemoveFromMemoryUsageControl(const ui64 mem) { + auto it = CountMemoryUsages.find(mem); + AFL_VERIFY(it != CountMemoryUsages.end())("mem", mem); + if (!--it->second) { + CountMemoryUsages.erase(it); + } + } + public: - TPortionsIndex(const TGranuleMeta& owner) + TPortionsIndex(const TGranuleMeta& owner, const NColumnShard::TPortionsIndexCounters& counters) : Owner(owner) + , Counters(counters) { } + ui64 GetMinMemoryRead() const { + if (CountMemoryUsages.empty()) { + return 0; + } else { + return CountMemoryUsages.rbegin()->second; + } + } + const std::map& GetPoints() const { return Points; }