diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 9463a9e82edd..57ed8eaa0ecd 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -258,7 +258,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { } } { - NOlap::TNormalizationController::TInitContext initCtx(Self->Info()); + NOlap::TNormalizationController::TInitContext initCtx(Self->Info(), Self->TabletID(), Self->SelfId()); Self->NormalizerController.InitNormalizers(initCtx); } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 0c3f07ee67c4..a201ed881a67 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -7,8 +7,8 @@ namespace NKikimr::NOlap { TNormalizationController::INormalizerComponent::TPtr TNormalizationController::RegisterNormalizer(INormalizerComponent::TPtr normalizer) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString()); AFL_VERIFY(normalizer); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString()); Counters.emplace_back(normalizer->GetClassName()); Normalizers.emplace_back(normalizer); return normalizer; @@ -74,7 +74,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription()); } else { - auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx))); + auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx); + AFL_VERIFY(component)("class_name", i.GetClassName()); + auto normalizer = RegisterNormalizer(std::shared_ptr(component.Release())); normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription()); } } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index 0d1f0b7def98..d0629298e59f 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -1,15 +1,16 @@ #pragma once #include -#include - #include #include + +#include #include + #include namespace NKikimr::NIceDb { - class TNiceDb; +class TNiceDb; } namespace NKikimr::NOlap { @@ -21,6 +22,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr StartedCount; NMonitoring::TDynamicCounters::TCounterPtr FinishedCount; NMonitoring::TDynamicCounters::TCounterPtr FailedCount; + public: TNormalizerCounters(const TString& normalizerName) : TBase("Normalizer") { @@ -49,7 +51,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner { } }; -enum class ENormalizerSequentialId: ui32 { +enum class ENormalizerSequentialId : ui32 { Granules = 1, Chunks, DeprecatedPortionsCleaner, @@ -74,19 +76,20 @@ class TNormalizationContext { YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor); YDB_ACCESSOR_DEF(TActorId, ShardActor); std::shared_ptr ResourcesGuard; + public: void SetResourcesGuard(std::shared_ptr rg) { ResourcesGuard = rg; } }; - class TNormalizationController; class INormalizerTask { public: using TPtr = std::shared_ptr; - virtual ~INormalizerTask() {} + virtual ~INormalizerTask() { + } virtual void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) = 0; }; @@ -94,7 +97,8 @@ class INormalizerTask { class INormalizerChanges { public: using TPtr = std::shared_ptr; - virtual ~INormalizerChanges() {} + virtual ~INormalizerChanges() { + } virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normalizationContext) const = 0; virtual void ApplyOnComplete(const TNormalizationController& normalizationContext) const { @@ -106,6 +110,7 @@ class INormalizerChanges { class TTrivialNormalizerTask: public INormalizerTask { INormalizerChanges::TPtr Changes; + public: TTrivialNormalizerTask(const INormalizerChanges::TPtr& changes) : Changes(changes) { @@ -118,10 +123,24 @@ class TTrivialNormalizerTask: public INormalizerTask { class TNormalizationController { public: class TInitContext { + private: TIntrusiveConstPtr StorageInfo; + const ui64 TabletId; + const NActors::TActorId TabletActorId; + public: - TInitContext(TTabletStorageInfo* info) - : StorageInfo(info) { + TInitContext(TTabletStorageInfo* info, const ui64 tabletId, const NActors::TActorId& actorId) + : StorageInfo(info) + , TabletId(tabletId) + , TabletActorId(actorId) { + } + + ui64 GetTabletId() const { + return TabletId; + } + + const NActors::TActorId& GetTabletActorId() const { + return TabletActorId; } TIntrusiveConstPtr GetStorageInfo() const { @@ -133,6 +152,7 @@ class TNormalizationController { private: YDB_READONLY_DEF(TString, ClassName); YDB_READONLY_DEF(TString, Description); + public: bool operator<(const TNormalizerFullId& item) const { if (ClassName == item.ClassName) { @@ -143,9 +163,7 @@ class TNormalizationController { TNormalizerFullId(const TString& className, const TString& description) : ClassName(className) - , Description(description) - { - + , Description(description) { } }; @@ -154,18 +172,26 @@ class TNormalizationController { YDB_ACCESSOR(bool, IsRepair, false); YDB_ACCESSOR_DEF(TString, UniqueDescription); YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString()); - + virtual TString DoDebugString() const { return ""; } virtual std::optional DoGetEnumSequentialId() const = 0; + protected: + const ui64 TabletId; + const NActors::TActorId TabletActorId; + public: using TPtr = std::shared_ptr; using TFactory = NObjectFactory::TParametrizedObjectFactory; - virtual ~INormalizerComponent() {} + virtual ~INormalizerComponent() = default; + INormalizerComponent(const TInitContext& context) + : TabletId(context.GetTabletId()) + , TabletActorId(context.GetTabletActorId()) { + } TNormalizerFullId GetNormalizerFullId() const { return TNormalizerFullId(GetClassName(), UniqueDescription); @@ -222,10 +248,12 @@ class TNormalizationController { } } - TConclusion> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc); + TConclusion> Init( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc); private: - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0; TAtomic ActiveTasksCount = 0; }; @@ -240,11 +268,13 @@ class TNormalizationController { std::set FinishedNormalizers; std::map StartedNormalizers; YDB_READONLY_DEF(std::optional, LastSavedNormalizerId); + private: INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer); public: - TNormalizationController(std::shared_ptr storagesManager, const std::shared_ptr& counters) + TNormalizationController(std::shared_ptr storagesManager, + const std::shared_ptr& counters) : StoragesManager(storagesManager) , TaskSubscription("CS::NORMALIZER", counters) { } @@ -265,7 +295,7 @@ class TNormalizationController { TString DebugString() const { return TStringBuilder() << "normalizers_count=" << Normalizers.size() - << ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA"); + << ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA"); } const INormalizerComponent::TPtr& GetNormalizer() const; @@ -273,4 +303,4 @@ class TNormalizationController { bool SwitchNormalizer(); const TNormalizerCounters& GetCounters() const; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h index 7cf6dd032057..cfb1402d8233 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h +++ b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h @@ -1,21 +1,25 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap { class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::CleanGranuleId); } + private: class TNormalizerResult; static inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); + public: virtual std::optional DoGetEnumSequentialId() const override { return ENormalizerSequentialId::CleanGranuleId; @@ -25,10 +29,12 @@ class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerCom return GetClassNameStatic(); } - TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) { + TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h index a8d09c80a781..333dd3f0d57a 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h @@ -1,23 +1,28 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap { class TGranulesNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::Granules); } + private: class TNormalizerResult; - static const inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator( - GetClassNameStatic()); + static const inline INormalizerComponent::TFactory::TRegistrator Registrator = + INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); + public: - TGranulesNormalizer(const TNormalizationController::TInitContext&) { + TGranulesNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } virtual std::optional DoGetEnumSequentialId() const override { @@ -28,7 +33,8 @@ class TGranulesNormalizer: public TNormalizationController::INormalizerComponent return GetClassNameStatic(); } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h b/ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h index c9a935e24371..232f0ee9a562 100644 --- a/ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h +++ b/ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h @@ -1,16 +1,19 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap::NInsertionDedup { class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return "CleanInsertionDedup"; } + private: class TNormalizerResult; @@ -18,7 +21,8 @@ class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerCo INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); public: - TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) { + TInsertionsDedupNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } virtual std::optional DoGetEnumSequentialId() const override { @@ -29,7 +33,8 @@ class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerCo return GetClassNameStatic(); } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap::NInsertionDedup diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.h b/ydb/core/tx/columnshard/normalizer/portion/chunks.h index 46c1462a8c86..5c20b9b0b21f 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.h @@ -1,114 +1,119 @@ #pragma once -#include #include - #include - +#include namespace NKikimr::NColumnShard { - class TTablesManager; +class TTablesManager; } namespace NKikimr::NOlap { - class TChunksNormalizer : public TNormalizationController::INormalizerComponent { +class TChunksNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + +public: + static TString GetClassNameStatic() { + return ::ToString(ENormalizerSequentialId::Chunks); + } + + virtual std::optional DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::Chunks; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + class TNormalizerResult; + + class TKey { + YDB_READONLY(ui64, Index, 0); + YDB_READONLY(ui64, Granule, 0); + YDB_READONLY(ui64, ColumnIdx, 0); + YDB_READONLY(ui64, PlanStep, 0); + YDB_READONLY(ui64, TxId, 0); + YDB_READONLY(ui64, Portion, 0); + YDB_READONLY(ui64, Chunk, 0); + + public: + template + void Load(TRowset& rowset) { + using namespace NColumnShard; + Index = rowset.template GetValue(); + Granule = rowset.template GetValue(); + ColumnIdx = rowset.template GetValue(); + PlanStep = rowset.template GetValue(); + TxId = rowset.template GetValue(); + Portion = rowset.template GetValue(); + Chunk = rowset.template GetValue(); + } + + bool operator<(const TKey& other) const { + return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx); + } + }; + + class TUpdate { + YDB_ACCESSOR(ui64, RecordsCount, 0); + YDB_ACCESSOR(ui64, RawBytes, 0); + }; + + class TChunkInfo { + YDB_READONLY_DEF(TKey, Key); + TColumnChunkLoadContext CLContext; + ISnapshotSchema::TPtr Schema; + + YDB_ACCESSOR_DEF(TUpdate, Update); + public: + template + TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) + : Key(std::move(key)) + , CLContext(rowset, dsGroupSelector) { + } - static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::Chunks); + ui32 GetRecordsCount() const { + return CLContext.GetMetaProto().GetNumRows(); } - virtual std::optional DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::Chunks; + const TBlobRange& GetBlobRange() const { + return CLContext.GetBlobRange(); } - virtual TString GetClassName() const override { - return GetClassNameStatic(); + const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const { + return CLContext.GetMetaProto(); } - class TNormalizerResult; - - class TKey { - YDB_READONLY(ui64, Index, 0); - YDB_READONLY(ui64, Granule, 0); - YDB_READONLY(ui64, ColumnIdx, 0); - YDB_READONLY(ui64, PlanStep, 0); - YDB_READONLY(ui64, TxId, 0); - YDB_READONLY(ui64, Portion, 0); - YDB_READONLY(ui64, Chunk, 0); - - public: - template - void Load(TRowset& rowset) { - using namespace NColumnShard; - Index = rowset.template GetValue(); - Granule = rowset.template GetValue(); - ColumnIdx = rowset.template GetValue(); - PlanStep = rowset.template GetValue(); - TxId = rowset.template GetValue(); - Portion = rowset.template GetValue(); - Chunk = rowset.template GetValue(); - } - - bool operator<(const TKey& other) const { - return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx); - } - }; - - class TUpdate { - YDB_ACCESSOR(ui64, RecordsCount, 0); - YDB_ACCESSOR(ui64, RawBytes, 0); - }; - - class TChunkInfo { - YDB_READONLY_DEF(TKey, Key); - TColumnChunkLoadContext CLContext; - ISnapshotSchema::TPtr Schema; - - YDB_ACCESSOR_DEF(TUpdate, Update); - public: - template - TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) - : Key(std::move(key)) - , CLContext(rowset, dsGroupSelector) - {} - - ui32 GetRecordsCount() const { - return CLContext.GetMetaProto().GetNumRows(); - } - - const TBlobRange& GetBlobRange() const { - return CLContext.GetBlobRange(); - } - - const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const { - return CLContext.GetMetaProto(); - } - - bool NormalizationRequired() const { - return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes(); - } - - std::shared_ptr GetLoader() const { - return Schema->GetColumnLoaderVerified(Key.GetColumnIdx()); - } - void InitSchema(const NColumnShard::TTablesManager& tm); - - bool operator<(const TChunkInfo& other) const { - return Key < other.Key; - } - }; - - static inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); - public: - TChunksNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) - {} + bool NormalizationRequired() const { + return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes(); + } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + std::shared_ptr GetLoader() const { + return Schema->GetColumnLoaderVerified(Key.GetColumnIdx()); + } + void InitSchema(const NColumnShard::TTablesManager& tm); - private: - NColumnShard::TBlobGroupSelector DsGroupSelector; + bool operator<(const TChunkInfo& other) const { + return Key < other.Key; + } }; -} + + static inline INormalizerComponent::TFactory::TRegistrator Registrator = + INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); + +public: + TChunksNormalizer(const TNormalizationController::TInitContext& info) + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { + } + + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + NColumnShard::TBlobGroupSelector DsGroupSelector; +}; +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h index 71b1b1fd44f1..8daa81015836 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h @@ -11,6 +11,9 @@ class TTablesManager; namespace NKikimr::NOlap::NSyncChunksWithPortions1 { class TNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::SyncPortionFromChunks); @@ -31,7 +34,8 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { public: TNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } virtual TConclusion> DoInit( @@ -40,4 +44,4 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { private: NColumnShard::TBlobGroupSelector DsGroupSelector; }; -} // namespace NKikimr::NOlap::NChunksActualization +} // namespace NKikimr::NOlap::NSyncChunksWithPortions1 diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h index 77e7cffed989..9b4104365d9a 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h @@ -5,7 +5,8 @@ namespace NKikimr::NOlap::NSyncChunksWithPortions { class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent { - +private: + using TBase = TNormalizationController::INormalizerComponent; static TString ClassName() { return "EmptyPortionsCleaner"; } @@ -15,7 +16,8 @@ class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormaliz public: TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } std::optional DoGetEnumSequentialId() const override { diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp new file mode 100644 index 000000000000..e719c01942c6 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -0,0 +1,298 @@ +#include "leaked_blobs.h" + +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap { + +class TLeakedBlobsNormalizerChanges: public INormalizerChanges { +private: + THashSet Leaks; + const ui64 TabletId; + NColumnShard::TBlobGroupSelector DsGroupSelector; + +public: + TLeakedBlobsNormalizerChanges(THashSet&& leaks, const ui64 tabletId, NColumnShard::TBlobGroupSelector dsGroupSelector) + : Leaks(std::move(leaks)) + , TabletId(tabletId) + , DsGroupSelector(dsGroupSelector) { + } + + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override { + NIceDb::TNiceDb db(txc.DB); + for (auto&& i : Leaks) { + TUnifiedBlobId blobId(DsGroupSelector.GetGroup(i), i); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("normalizer", "leaked_blobs")("blob_id", blobId.ToStringLegacy()); + db.Table().Key(blobId.ToStringLegacy(), TabletId).Update(); + } + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("normalizer", "leaked_blobs")("removed_blobs", Leaks.size()); + + return true; + } + + void ApplyOnComplete(const TNormalizationController& /* normController */) const override { + } + + ui64 GetSize() const override { + return Leaks.size(); + } +}; + +class TRemoveLeakedBlobsActor: public TActorBootstrapped { +private: + TVector Channels; + THashSet CSBlobIds; + THashSet BSBlobIds; + TActorId CSActorId; + ui64 CSTabletId; + i32 WaitingCount = 0; + THashSet WaitingRequests; + NColumnShard::TBlobGroupSelector DsGroupSelector; + + class TIterator { + private: + TVector::const_iterator ChannelsIterator; + TVector::const_iterator ChannelsEnd; + TVector::const_iterator HistoryIterator; + TVector::const_iterator HistoryEnd; + + void InitIterators() { + while (IsValid() && HistoryIterator == HistoryEnd) { + ++ChannelsIterator; + if (IsValid()) { + HistoryIterator = ChannelsIterator->History.begin(); + HistoryEnd = ChannelsIterator->History.end(); + } + } + } + + public: + ui32 GetChannel() const { + AFL_VERIFY(IsValid()); + return ChannelsIterator->Channel; + } + + ui32 GetGroupID() const { + AFL_VERIFY(IsValid()); + return HistoryIterator->GroupID; + } + + TIterator(const TVector& v) + : ChannelsIterator(v.begin()) + , ChannelsEnd(v.end()) + { + if (IsValid()) { + HistoryIterator = ChannelsIterator->History.begin(); + HistoryEnd = ChannelsIterator->History.end(); + InitIterators(); + } + } + + bool IsValid() const { + return ChannelsIterator != ChannelsEnd; + } + + void Next() { + AFL_VERIFY(IsValid()); + if (HistoryIterator != HistoryEnd) { + ++HistoryIterator; + } + InitIterators(); + } + }; + + void CheckFinish() { + AFL_VERIFY(WaitingRequests.empty()); + if (Iterator.IsValid()) { + TLogoBlobID from(CSTabletId, 0, 0, Iterator.GetChannel(), 0, 0); + TLogoBlobID to(CSTabletId, Max(), Max(), Iterator.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie); + auto request = MakeHolder(CSTabletId, from, to, false, TInstant::Max(), true); + SendToBSProxy(SelfId(), Iterator.GetGroupID(), request.Release(), ++WaitingCount); + WaitingRequests.emplace(WaitingCount); + Iterator.Next(); + while (Iterator.IsValid() && Iterator.GetChannel() < 2) { + Iterator.Next(); + } + } else { + AFL_VERIFY(CSBlobIds.size() <= BSBlobIds.size())("cs", CSBlobIds.size())("bs", BSBlobIds.size())( + "error", "have to use broken blobs repair"); + for (auto&& i : CSBlobIds) { + AFL_VERIFY(BSBlobIds.erase(i))("error", "have to use broken blobs repair")("blob_id", i); + } + TActorContext::AsActorContext().Send( + CSActorId, std::make_unique( + std::make_shared(std::move(BSBlobIds), CSTabletId, DsGroupSelector))); + PassAway(); + } + } + + TIterator Iterator; + +public: + TRemoveLeakedBlobsActor(TVector&& channels, THashSet&& csBlobIDs, TActorId csActorId, ui64 csTabletId, + const NColumnShard::TBlobGroupSelector& dsGroupSelector) + : Channels(std::move(channels)) + , CSBlobIds(std::move(csBlobIDs)) + , CSActorId(csActorId) + , CSTabletId(csTabletId) + , DsGroupSelector(dsGroupSelector) + , Iterator(Channels) + { + } + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateWait); + CheckFinish(); + } + + void Handle(TEvBlobStorage::TEvRangeResult::TPtr& ev, const TActorContext& /*ctx*/) { + TEvBlobStorage::TEvRangeResult* msg = ev->Get(); + AFL_VERIFY(msg->Status == NKikimrProto::OK)("status", msg->Status)("error", msg->ErrorReason); + AFL_VERIFY(WaitingRequests.erase(ev->Cookie)); + for (auto& resp : msg->Responses) { + AFL_VERIFY(!resp.Buffer); + BSBlobIds.emplace(resp.Id); + } + CheckFinish(); + } + + STFUNC(StateWait) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvBlobStorage::TEvRangeResult, Handle); + default: + AFL_VERIFY(false); + } + } +}; + +TLeakedBlobsNormalizer::TLeakedBlobsNormalizer(const TNormalizationController::TInitContext& info) + : TBase(info) + , Channels(info.GetStorageInfo()->Channels) + , DsGroupSelector(info.GetStorageInfo()) { +} + +class TRemoveLeakedBlobsTask: public INormalizerTask { + TVector Channels; + THashSet CSBlobIDs; + ui64 TabletId; + TActorId ActorId; + NColumnShard::TBlobGroupSelector DsGroupSelector; + +public: + TRemoveLeakedBlobsTask(TVector&& channels, THashSet&& csBlobIDs, ui64 tabletId, TActorId actorId, + const NColumnShard::TBlobGroupSelector& dsGroupSelector) + : Channels(std::move(channels)) + , CSBlobIDs(std::move(csBlobIDs)) + , TabletId(tabletId) + , ActorId(actorId) + , DsGroupSelector(dsGroupSelector) { + } + void Start(const TNormalizationController& /*controller*/, const TNormalizationContext& /*nCtx*/) override { + NActors::TActivationContext::Register( + new TRemoveLeakedBlobsActor(std::move(Channels), std::move(CSBlobIDs), ActorId, TabletId, DsGroupSelector)); + } +}; + +TConclusion> TLeakedBlobsNormalizer::DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + AFL_VERIFY(AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); + NIceDb::TNiceDb db(txc.DB); + const bool ready = (int)Schema::Precharge(db, txc.DB.GetScheme()) & + (int)Schema::Precharge(db, txc.DB.GetScheme()) & + (int)Schema::Precharge(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + + NColumnShard::TTablesManager tablesManager( + controller.GetStoragesManager(), std::make_shared(nullptr), TabletId); + + if (!tablesManager.InitFromDB(db)) { + ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); + return TConclusionStatus::Fail("Can't load index"); + } + + if (!tablesManager.HasPrimaryIndex()) { + return std::vector{}; + } + + THashSet csBlobIDs; + auto conclusion = LoadPortionBlobIds(tablesManager, db, csBlobIDs); + if (conclusion.IsFail()) { + return conclusion; + } + + return std::vector{ std::make_shared( + std::move(Channels), std::move(csBlobIDs), TabletId, TabletActorId, DsGroupSelector) }; +} + +TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds( + const NColumnShard::TTablesManager& tablesManager, NIceDb::TNiceDb& db, THashSet& result) { + TDbWrapper wrapper(db.GetDatabase(), nullptr); + if (Portions.empty()) { + THashMap portionsLocal; + if (!wrapper.LoadPortions({}, [&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { + const TIndexInfo& indexInfo = + portion.GetSchema(tablesManager.GetPrimaryIndexAsVerified().GetVersionedIndex())->GetIndexInfo(); + AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo, DsGroupSelector)); + const ui64 portionId = portion.GetPortionIdVerified(); + AFL_VERIFY(portionsLocal.emplace(portionId, std::move(portion)).second); + })) { + return TConclusionStatus::Fail("repeated read db"); + } + Portions = std::move(portionsLocal); + } + if (Records.empty()) { + THashMap> recordsLocal; + if (!wrapper.LoadColumns(std::nullopt, [&](TColumnChunkLoadContextV1&& chunk) { + const ui64 portionId = chunk.GetPortionId(); + recordsLocal[portionId].emplace_back(std::move(chunk)); + })) { + return TConclusionStatus::Fail("repeated read db"); + } + Records = std::move(recordsLocal); + } + if (Indexes.empty()) { + THashMap> indexesLocal; + if (!wrapper.LoadIndexes(std::nullopt, [&](const ui64 /*pathId*/, const ui64 /*portionId*/, TIndexChunkLoadContext&& indexChunk) { + const ui64 portionId = indexChunk.GetPortionId(); + indexesLocal[portionId].emplace_back(std::move(indexChunk)); + })) { + return TConclusionStatus::Fail("repeated read db"); + } + Indexes = std::move(indexesLocal); + } + AFL_VERIFY(Portions.size() == Records.size())("portions", Portions.size())("records", Records.size()); + THashSet resultLocal; + for (auto&& i : Portions) { + auto itRecords = Records.find(i.first); + AFL_VERIFY(itRecords != Records.end()); + auto itIndexes = Indexes.find(i.first); + std::vector indexes; + if (itIndexes != Indexes.end()) { + indexes = std::move(itIndexes->second); + } + TPortionDataAccessor accessor = + TPortionAccessorConstructor::BuildForLoading(i.second.Build(), std::move(itRecords->second), std::move(indexes)); + THashMap> blobIdsByStorage; + accessor.FillBlobIdsByStorage(blobIdsByStorage, tablesManager.GetPrimaryIndexAsVerified().GetVersionedIndex()); + auto it = blobIdsByStorage.find(NBlobOperations::TGlobal::DefaultStorageId); + if (it == blobIdsByStorage.end()) { + continue; + } + for (auto&& c : it->second) { + resultLocal.emplace(c.GetLogoBlobId()); + } + } + std::swap(resultLocal, result); + return TConclusionStatus::Success(); +} + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h new file mode 100644 index 000000000000..4545ea3b605b --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.h @@ -0,0 +1,49 @@ +#pragma once + +#include "normalizer.h" + +#include +#include + +namespace NKikimr::NOlap { +class TLeakedBlobsNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; +public: + static TString GetClassNameStatic() { + return "LeakedBlobsNormalizer"; + } + +private: + static inline TFactory::TRegistrator Registrator = + TFactory::TRegistrator(GetClassNameStatic()); + +public: + class TNormalizerResult; + class TTask; + +public: + virtual std::optional DoGetEnumSequentialId() const override { + return std::nullopt; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + TLeakedBlobsNormalizer(const TNormalizationController::TInitContext& info); + + TConclusionStatus LoadPortionBlobIds(const NColumnShard::TTablesManager& tablesManager, NIceDb::TNiceDb& db, THashSet& result); + + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + TVector Channels; + TActorId TRemoveLeakedBlobsActorId; + NColumnShard::TBlobGroupSelector DsGroupSelector; + THashMap Portions; + THashMap> Records; + THashMap> Indexes; +}; +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h index 38ac921cf6cc..edfdad23ff4c 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h @@ -81,9 +81,12 @@ class TPortionsNormalizerTask: public INormalizerTask { }; class TPortionsNormalizerBase: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; public: TPortionsNormalizerBase(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } TConclusionStatus InitPortions( diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h index f8120c16ef36..110b0e35a7c4 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h @@ -11,6 +11,9 @@ class TTablesManager; namespace NKikimr::NOlap::NRestorePortionsFromChunks { class TNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::RestorePortionFromChunks); @@ -31,7 +34,8 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { public: TNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } virtual TConclusion> DoInit( @@ -40,4 +44,4 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { private: NColumnShard::TBlobGroupSelector DsGroupSelector; }; -} // namespace NKikimr::NOlap::NChunksActualization +} // namespace NKikimr::NOlap::NRestorePortionsFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.h index 0d5f750c3f96..eecfa5602200 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.h @@ -11,6 +11,9 @@ class TTablesManager; namespace NKikimr::NOlap::NRestoreV1Chunks { class TNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::RestoreV1Chunks_V2); @@ -31,7 +34,8 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { public: TNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } virtual TConclusion> DoInit( @@ -40,4 +44,4 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { private: NColumnShard::TBlobGroupSelector DsGroupSelector; }; -} // namespace NKikimr::NOlap::NChunksActualization +} // namespace NKikimr::NOlap::NRestoreV1Chunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h index 46e7d06ddc95..c872da0af365 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h @@ -11,6 +11,9 @@ class TTablesManager; namespace NKikimr::NOlap::NRestoreV2Chunks { class TNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::RestoreV2Chunks); @@ -31,7 +34,8 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { public: TNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } virtual TConclusion> DoInit( @@ -40,4 +44,4 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { private: NColumnShard::TBlobGroupSelector DsGroupSelector; }; -} // namespace NKikimr::NOlap::NChunksActualization +} // namespace NKikimr::NOlap::NRestoreV2Chunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h index 8f189d4d49ba..5272bd1dadc8 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h +++ b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h @@ -11,6 +11,9 @@ class TTablesManager; namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks { class TNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::SyncMinSnapshotFromChunks); @@ -31,7 +34,8 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { public: TNormalizer(const TNormalizationController::TInitContext& info) - : DsGroupSelector(info.GetStorageInfo()) { + : TBase(info) + , DsGroupSelector(info.GetStorageInfo()) { } virtual TConclusion> DoInit( @@ -40,4 +44,4 @@ class TNormalizer: public TNormalizationController::INormalizerComponent { private: NColumnShard::TBlobGroupSelector DsGroupSelector; }; -} // namespace NKikimr::NOlap::NChunksActualization +} // namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h index bac01c3f3d27..4af64a29fccc 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h +++ b/ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h @@ -6,6 +6,9 @@ namespace NKikimr::NOlap::NNormalizer::NSpecialColumns { class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: struct TKey { ui32 Index; @@ -20,13 +23,13 @@ class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent { using TKeyBatch = std::vector; private: - std::optional> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize); virtual std::set GetColumnIdsToDelete() const = 0; public: - TDeleteTrashImpl(const TNormalizationController::TInitContext&) { + TDeleteTrashImpl(const TNormalizationController::TInitContext& context) + : TBase(context) { } virtual TConclusion> DoInit( @@ -36,6 +39,7 @@ class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent { class TRemoveDeleteFlag: public TDeleteTrashImpl { private: using TBase = TDeleteTrashImpl; + public: static TString GetClassNameStatic() { return "RemoveDeleteFlag"; @@ -86,9 +90,8 @@ class TRemoveWriteId: public TDeleteTrashImpl { public: TRemoveWriteId(const TNormalizationController::TInitContext& context) - : TBase(context) - { + : TBase(context) { } }; -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index 077cea88a48d..e7eaf752badd 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -13,6 +13,7 @@ SRCS( GLOBAL restore_v1_chunks.cpp GLOBAL restore_v2_chunks.cpp GLOBAL snapshot_from_chunks.cpp + GLOBAL leaked_blobs.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.h b/ydb/core/tx/columnshard/normalizer/schema_version/version.h index 48c8d0b4ea15..4d526734fa81 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.h +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.h @@ -1,25 +1,28 @@ #pragma once -#include #include #include - +#include namespace NKikimr::NColumnShard { - class TTablesManager; +class TTablesManager; } namespace NKikimr::NOlap { -class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent { +class TSchemaVersionNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return "SchemaVersionCleaner"; } private: - static inline TFactory::TRegistrator Registrator = TFactory::TRegistrator( - GetClassNameStatic()); + static inline TFactory::TRegistrator Registrator = + TFactory::TRegistrator(GetClassNameStatic()); + public: class TNormalizerResult; class TTask; @@ -33,10 +36,12 @@ class TSchemaVersionNormalizer : public TNormalizationController::INormalizerCom return GetClassNameStatic(); } - TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) { + TSchemaVersionNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/tables/normalizer.h b/ydb/core/tx/columnshard/normalizer/tables/normalizer.h index cfab565e5a67..21826e7605c0 100644 --- a/ydb/core/tx/columnshard/normalizer/tables/normalizer.h +++ b/ydb/core/tx/columnshard/normalizer/tables/normalizer.h @@ -1,21 +1,24 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap { class TRemovedTablesNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return ::ToString(ENormalizerSequentialId::TablesCleaner); } private: - static inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator( - GetClassNameStatic()); + static inline INormalizerComponent::TFactory::TRegistrator Registrator = + INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); class TNormalizerResult; + public: virtual std::optional DoGetEnumSequentialId() const override { return ENormalizerSequentialId::TablesCleaner; @@ -25,10 +28,12 @@ class TRemovedTablesNormalizer: public TNormalizationController::INormalizerComp return GetClassNameStatic(); } - TRemovedTablesNormalizer(const TNormalizationController::TInitContext&) - {} + TRemovedTablesNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { + } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/tablet/broken_txs.h b/ydb/core/tx/columnshard/normalizer/tablet/broken_txs.h index 1ff68530bf35..45befd504b61 100644 --- a/ydb/core/tx/columnshard/normalizer/tablet/broken_txs.h +++ b/ydb/core/tx/columnshard/normalizer/tablet/broken_txs.h @@ -1,24 +1,28 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap { class TBrokenTxsNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return "BrokenTxsNormalizer"; } + private: class TNormalizerResult; - static const inline INormalizerComponent::TFactory::TRegistrator Registrator = + static const inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); public: - TBrokenTxsNormalizer(const TNormalizationController::TInitContext&) { + TBrokenTxsNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } virtual std::optional DoGetEnumSequentialId() const override { @@ -29,7 +33,8 @@ class TBrokenTxsNormalizer: public TNormalizationController::INormalizerComponen return GetClassNameStatic(); } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/tablet/gc_counters.h b/ydb/core/tx/columnshard/normalizer/tablet/gc_counters.h index 8787da559489..c48aee45d796 100644 --- a/ydb/core/tx/columnshard/normalizer/tablet/gc_counters.h +++ b/ydb/core/tx/columnshard/normalizer/tablet/gc_counters.h @@ -1,23 +1,28 @@ #pragma once -#include #include - +#include namespace NKikimr::NOlap { class TGCCountersNormalizer: public TNormalizationController::INormalizerComponent { +private: + using TBase = TNormalizationController::INormalizerComponent; + public: static TString GetClassNameStatic() { return "GCCountersNormalizer"; } + private: class TNormalizerResult; - static const inline INormalizerComponent::TFactory::TRegistrator Registrator = + static const inline INormalizerComponent::TFactory::TRegistrator Registrator = INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); + public: - TGCCountersNormalizer(const TNormalizationController::TInitContext&) { + TGCCountersNormalizer(const TNormalizationController::TInitContext& context) + : TBase(context) { } virtual std::optional DoGetEnumSequentialId() const override { @@ -28,7 +33,8 @@ class TGCCountersNormalizer: public TNormalizationController::INormalizerCompone return GetClassNameStatic(); } - virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 85184496046b..5364cbe10d4a 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -37,7 +37,9 @@ class TNormalizerChecker { } virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& /*columnShardConfig*/) const { + } + virtual void CorrectFeatureFlagsOnStart(TFeatureFlags& /*featuresFlags*/) const { } virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const { @@ -254,6 +256,7 @@ Y_UNIT_TEST_SUITE(Normalizers) { TTester::Setup(runtime); checker.CorrectConfigurationOnStart(runtime.GetAppData().ColumnShardConfig); + checker.CorrectFeatureFlagsOnStart(runtime.GetAppData().FeatureFlags); const ui64 tableId = 1; const std::vector schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), @@ -292,12 +295,36 @@ Y_UNIT_TEST_SUITE(Normalizers) { } Y_UNIT_TEST(PortionsNormalizer) { - TestNormalizerImpl(); + class TLocalNormalizerChecker: public TNormalizerChecker { + public: + virtual ui64 RecordsCountAfterReboot(const ui64 /*initialRecodsCount*/) const override { + return 0; + } + virtual void CorrectFeatureFlagsOnStart(TFeatureFlags & featuresFlags) const override{ + featuresFlags.SetEnableWritePortionsOnInsert(true); + } + virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override { + { + auto* repair = columnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("EmptyPortionsCleaner"); + repair->SetDescription("Removing unsync portions"); + } + { + auto* repair = columnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("LeakedBlobsNormalizer"); + repair->SetDescription("Removing leaked blobs"); + } + } + }; + TestNormalizerImpl(TLocalNormalizerChecker()); } Y_UNIT_TEST(SchemaVersionsNormalizer) { class TLocalNormalizerChecker: public TNormalizerChecker { public: + virtual void CorrectFeatureFlagsOnStart(TFeatureFlags& featuresFlags) const override { + featuresFlags.SetEnableWritePortionsOnInsert(true); + } virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override { auto* repair = columnShardConfig.MutableRepairs()->Add(); repair->SetClassName("SchemaVersionCleaner");