From a742240953cc541e386bf156ee252c222722ed40 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 3 Dec 2024 15:19:53 +0300 Subject: [PATCH 1/2] fix cleanup volume limits and speed up versions index copy --- ydb/core/tx/columnshard/columnshard_impl.cpp | 10 +++++----- ydb/core/tx/columnshard/engines/column_engine.h | 1 + ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 5 +++-- ydb/core/tx/columnshard/engines/column_engine_logs.h | 10 +++++++++- .../tx/columnshard/engines/portions/portion_info.cpp | 2 +- .../engines/scheme/versions/versioned_index.h | 5 +++++ 6 files changed, 24 insertions(+), 9 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f38dad89222e..aa50c52623c2 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -717,7 +717,7 @@ void TColumnShard::StartIndexTask(std::vector&& da auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); Y_ABORT_UNLESS(indexChanges); - auto actualIndexInfo = std::make_shared(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); indexChanges->Start(*this); auto ev = std::make_unique(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing); @@ -875,7 +875,7 @@ void TColumnShard::StartCompaction(const std::shared_ptrSetQueueGuard(guard); compaction->Start(*this); - auto actualIndexInfo = std::make_shared(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); auto request = compaction->ExtractDataAccessorsRequest(); const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + indexChanges->CalcMemoryForUsage(); @@ -973,7 +973,7 @@ bool TColumnShard::SetupTtl(const THashMap& pathTtls) { return false; } - auto actualIndexInfo = std::make_shared(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); for (auto&& i : indexChanges) { i->Start(*this); auto request = i->ExtractDataAccessorsRequest(); @@ -1033,7 +1033,7 @@ void TColumnShard::SetupCleanupPortions() { changes->Start(*this); auto request = changes->ExtractDataAccessorsRequest(); - auto actualIndexInfo = std::make_shared(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()); const auto subscriber = std::make_shared(SelfId(), changes, actualIndexInfo); @@ -1064,7 +1064,7 @@ void TColumnShard::SetupCleanupTables() { } ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString()); - auto actualIndexInfo = std::make_shared(TablesManager.GetPrimaryIndex()->GetVersionedIndex()); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); auto ev = std::make_unique(actualIndexInfo, changes, false); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 58581bab51e2..29dc37d9f734 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -329,6 +329,7 @@ class IColumnEngine { virtual std::vector CollectMetadataRequests() const = 0; virtual const TVersionedIndex& GetVersionedIndex() const = 0; + virtual const std::shared_ptr& GetVersionedIndexReadonlyCopy() = 0; virtual std::shared_ptr CopyVersionedIndexPtr() const = 0; virtual const std::shared_ptr& GetReplaceKey() const; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 8fd014582c66..4e05f1846dae 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -333,7 +333,7 @@ std::shared_ptr TColumnEngineForLogs::Start ui32 skipLocked = 0; ui32 portionsFromDrop = 0; bool limitExceeded = false; - const ui32 maxChunksCount = 100000; + const ui32 maxChunksCount = 500000; const ui32 maxPortionsCount = 1000; for (ui64 pathId : pathsToDrop) { auto g = GranulesStorage->GetGranuleOptional(pathId); @@ -401,7 +401,8 @@ std::shared_ptr TColumnEngineForLogs::Start } } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())( - "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked); + "portions_prepared", changes->GetPortionsToDrop().size())("drop", portionsFromDrop)("skip", skipLocked)("portions_counter", portionsCount)( + "chunks", chunksCount)("limit", limitExceeded)("max_portions", maxPortionsCount)("max_chunks", maxChunksCount); if (changes->GetPortionsToDrop().empty()) { return nullptr; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 92f1a4ed5e8a..5c6d5b988e5e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -63,8 +63,17 @@ class TColumnEngineForLogs: public IColumnEngine { std::shared_ptr ActualizationController; std::shared_ptr SchemaObjectsCache = std::make_shared(); + TVersionedIndex VersionedIndex; + std::shared_ptr VersionedIndexCopy; public: + virtual const std::shared_ptr& GetVersionedIndexReadonlyCopy() override { + if (!VersionedIndexCopy || !VersionedIndexCopy->IsEqualTo(VersionedIndex)) { + VersionedIndexCopy = std::make_shared(VersionedIndex); + } + return VersionedIndexCopy; + } + const std::shared_ptr& GetActualizationController() const { return ActualizationController; } @@ -224,7 +233,6 @@ class TColumnEngineForLogs: public IColumnEngine { void AppendPortion(const TPortionDataAccessor& portionInfo, const bool addAsAccessor = true); private: - TVersionedIndex VersionedIndex; ui64 TabletId; TMap> PathStats; // per path_id stats sorted by path_id std::map> CleanupPortions; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 8cc9c4e3be65..96d307390e83 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -51,7 +51,7 @@ ui64 TPortionInfo::GetMetadataMemorySize() const { } ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const { - return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1); + return schemaColumnsCount * (GetRecordsCount() / 10000 + 1); } void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e320089f85df..390ed04a9963 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -33,6 +33,11 @@ class TVersionedIndex { ISnapshotSchema::TPtr SchemeForActualization; public: + bool IsEqualTo(const TVersionedIndex& vIndex) { + return LastSchemaVersion != vIndex.LastSchemaVersion || SnapshotByVersion.size() != vIndex.SnapshotByVersion.size() || + ShardingInfo.size() != vIndex.ShardingInfo.size() || SchemeVersionForActualization != vIndex.SchemeVersionForActualization; + } + ISnapshotSchema::TPtr GetLastCriticalSchema() const { return SchemeForActualization; } From b729c1f6ab2c157ee7cd9d0d0febf98ca69983ae Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 3 Dec 2024 17:04:40 +0300 Subject: [PATCH 2/2] fix --- .../tx/columnshard/engines/scheme/versions/versioned_index.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index 390ed04a9963..c5fc018e7e32 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -34,8 +34,8 @@ class TVersionedIndex { public: bool IsEqualTo(const TVersionedIndex& vIndex) { - return LastSchemaVersion != vIndex.LastSchemaVersion || SnapshotByVersion.size() != vIndex.SnapshotByVersion.size() || - ShardingInfo.size() != vIndex.ShardingInfo.size() || SchemeVersionForActualization != vIndex.SchemeVersionForActualization; + return LastSchemaVersion == vIndex.LastSchemaVersion && SnapshotByVersion.size() == vIndex.SnapshotByVersion.size() && + ShardingInfo.size() == vIndex.ShardingInfo.size() && SchemeVersionForActualization == vIndex.SchemeVersionForActualization; } ISnapshotSchema::TPtr GetLastCriticalSchema() const {