diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index c650ee040288..935fbc36f41d 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -281,7 +281,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { break; } NIceDb::TNiceDb db(txc.DB); - Self->NormalizerController.UpdateControllerState(db); + Self->NormalizerController.OnNormalizerFinished(db); Self->NormalizerController.SwitchNormalizer(); } else { Self->NormalizerController.GetCounters().OnNormalizerFails(); @@ -335,7 +335,7 @@ bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) { NIceDb::TNiceDb db(txc.DB); - Self->NormalizerController.UpdateControllerState(db); + Self->NormalizerController.OnNormalizerFinished(db); } return true; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 5098b6e7573c..9f90e648452f 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -56,7 +56,9 @@ struct Schema : NIceDb::Schema { PortionsTableId, BackgroundSessionsTableId, ShardingInfoTableId, - RepairsTableId + RepairsTableId, + NormalizersTableId, + NormalizerEventsTableId }; enum class ETierTables: ui32 { @@ -487,14 +489,26 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns; }; - struct Repairs: Table { - struct Identifier: Column<1, NScheme::NTypeIds::Utf8> {}; - struct UniqueDescription: Column<2, NScheme::NTypeIds::Utf8> {}; + struct Normalizers: Table { + struct ClassName: Column<1, NScheme::NTypeIds::Utf8> {}; + struct Description: Column<2, NScheme::NTypeIds::Utf8> {}; + struct Identifier: Column<3, NScheme::NTypeIds::Utf8> {}; + struct Start: Column<4, NScheme::NTypeIds::Uint64> {}; + struct Finish: Column<5, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + + struct NormalizerEvents: Table { + struct NormalizerId: Column<1, NScheme::NTypeIds::Utf8> {}; + struct EventId: Column<2, NScheme::NTypeIds::Utf8> {}; struct Instant: Column<3, NScheme::NTypeIds::Uint64> {}; - struct Event: Column<4, NScheme::NTypeIds::Utf8> {}; + struct EventType: Column<4, NScheme::NTypeIds::Utf8> {}; + struct Description: Column<5, NScheme::NTypeIds::Utf8> {}; - using TKey = TableKey; - using TColumns = TableColumns; + using TKey = TableKey; + using TColumns = TableColumns; }; using TTables = SchemaTables< @@ -529,7 +543,8 @@ struct Schema : NIceDb::Schema { IndexPortions, BackgroundSessions, ShardingInfo, - Repairs + Normalizers, + NormalizerEvents >; // @@ -572,6 +587,22 @@ struct Schema : NIceDb::Schema { return false; } + template + static bool GetSpecialValueOpt(NIceDb::TNiceDb& db, EValueIds key, std::optional& value) { + using TSource = std::conditional_t || std::is_enum_v, Value::Digit, Value::Bytes>; + + auto rowset = db.Table().Key((ui32)key).Select(); + if (rowset.IsReady()) { + if (rowset.IsValid()) { + value = T{ rowset.template GetValue() }; + } else { + value = {}; + } + return true; + } + return false; + } + template static bool GetSpecialProtoValue(NIceDb::TNiceDb& db, EValueIds key, std::optional& value) { auto rowset = db.Table().Key(ui32(key)).Select(); @@ -584,13 +615,31 @@ struct Schema : NIceDb::Schema { return false; } - static void AddRepairEvent(NIceDb::TNiceDb& db, const TString& id, const TInstant instant, const TString& description, const TString& eventInfo) { - db.Table().Key(id) + static void AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& normalizerId, const TString& eventType, const TString& description) { + db.Table().Key(normalizerId, TGUID::CreateTimebased().AsUuidString()) .Update( - NIceDb::TUpdate(description), - NIceDb::TUpdate(instant.Seconds()), - NIceDb::TUpdate(eventInfo)) - ; + NIceDb::TUpdate(TInstant::Now().MicroSeconds()), + NIceDb::TUpdate(eventType), + NIceDb::TUpdate(description) + ); + } + + static void StartNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) { + db.Table().Key(className, description, normalizerId) + .Update( + NIceDb::TUpdate(TInstant::Now().MicroSeconds()) + ); + } + + static void RemoveNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) { + db.Table().Key(className, description, normalizerId).Delete(); + } + + static void FinishNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) { + db.Table().Key(className, description, normalizerId) + .Update( + NIceDb::TUpdate(TInstant::Now().MicroSeconds()) + ); } static void SaveSpecialValue(NIceDb::TNiceDb& db, EValueIds key, const TString& value) { diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 4cf067a44d44..94642ed75361 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -42,17 +42,15 @@ void TTrivialNormalizerTask::Start(const TNormalizationController& /* controller TActorContext::AsActorContext().Send(nCtx.GetShardActor(), std::make_unique(Changes)); } -void TNormalizationController::AddRepairInfo(NIceDb::TNiceDb& db, const TString& info) const { - NColumnShard::Schema::AddRepairEvent(db, GetNormalizer()->GetUniqueId(), TInstant::Now(), GetNormalizer()->GetUniqueDescription(), "INFO:" + info); +void TNormalizationController::AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& eventType, const TString& eventDescription) const { + NColumnShard::Schema::AddNormalizerEvent(db, GetNormalizer()->GetUniqueId(), eventType, eventDescription); } -void TNormalizationController::UpdateControllerState(NIceDb::TNiceDb& db) const { +void TNormalizationController::OnNormalizerFinished(NIceDb::TNiceDb& db) const { if (auto seqId = GetNormalizer()->GetSequentialId()) { NColumnShard::Schema::SaveSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, *seqId); } - if (GetNormalizer()->GetIsRepair()) { - NColumnShard::Schema::AddRepairEvent(db, GetNormalizer()->GetUniqueId(), TInstant::Now(), GetNormalizer()->GetUniqueDescription(), "FINISHED"); - } + NColumnShard::Schema::FinishNormalizer(db, GetNormalizer()->GetClassName(), GetNormalizer()->GetUniqueDescription(), GetNormalizer()->GetUniqueId()); } void TNormalizationController::InitNormalizers(const TInitContext& ctx) { @@ -61,7 +59,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (HasAppData()) { for (auto&& i : AppDataVerified().ColumnShardConfig.GetRepairs()) { AFL_VERIFY(i.GetDescription())("error", "repair normalization have to has unique description"); - if (FinishedRepairs.contains(i.GetDescription())) { + if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription()); } else { auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx))); @@ -87,37 +85,66 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx))); AFL_VERIFY(normalizer->GetEnumSequentialIdVerified() == nType); AFL_VERIFY(lastRegisteredNormalizer <= nType)("current", ToString(nType))("last", ToString(lastRegisteredNormalizer)); - normalizer->SetUniqueDescription(normalizer->GetClassName()); lastRegisteredNormalizer = nType; } + + for (auto&& i : Normalizers) { + auto it = StartedNormalizers.find(i->GetNormalizerFullId()); + if (it != StartedNormalizers.end()) { + i->SetUniqueId(it->second); + } + if (!i->GetUniqueDescription()) { + i->SetUniqueDescription(i->GetClassName()); + } + } } bool TNormalizationController::InitControllerState(NIceDb::TNiceDb& db) { { - auto rowset = db.Table().Select(); + auto rowset = db.Table().Select(); if (!rowset.IsReady()) { return false; } - THashSet descriptions; + std::set finished; + std::map started; while (!rowset.EndOfSet()) { - if (rowset.GetValue() != "FINISHED") { - continue; + const TNormalizerFullId id( + rowset.GetValue(), + rowset.GetValue()); + if (!rowset.HaveValue()) { + started.emplace(id, rowset.GetValue()); + } else { + finished.emplace(id); } - descriptions.emplace(rowset.GetValue()); if (!rowset.Next()) { return false; } } - FinishedRepairs = descriptions; + FinishedNormalizers = finished; + StartedNormalizers = started; } - ui64 lastNormalizerId; - if (NColumnShard::Schema::GetSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, lastNormalizerId)) { - LastSavedNormalizerId = lastNormalizerId; - } else { - LastSavedNormalizerId = {}; + std::optional lastNormalizerId; + if (!NColumnShard::Schema::GetSpecialValueOpt(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, lastNormalizerId)) { + return false; } + LastSavedNormalizerId = lastNormalizerId.value_or(0); return true; } +NKikimr::TConclusion> TNormalizationController::INormalizerComponent::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastSavedNormalizerId()) + ("seq_id", GetSequentialId())("type", GetEnumSequentialId()); + auto result = DoInit(controller, txc); + if (!result.IsSuccess()) { + return result; + } + NIceDb::TNiceDb db(txc.DB); + if (!controller.StartedNormalizers.contains(GetNormalizerFullId())) { + NColumnShard::Schema::StartNormalizer(db, GetClassName(), GetUniqueDescription(), UniqueId); + } + AtomicSet(ActiveTasksCount, result.GetResult().size()); + return result; +} + } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index 18f967020b65..aaa6b6be1d87 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -119,6 +119,26 @@ class TNormalizationController { } }; + class TNormalizerFullId { + private: + YDB_READONLY_DEF(TString, ClassName); + YDB_READONLY_DEF(TString, Description); + public: + bool operator<(const TNormalizerFullId& item) const { + if (ClassName == item.ClassName) { + return Description < item.Description; + } + return ClassName < item.ClassName; + } + + TNormalizerFullId(const TString& className, const TString& description) + : ClassName(className) + , Description(description) + { + + } + }; + class INormalizerComponent { private: YDB_ACCESSOR(bool, IsRepair, false); @@ -137,6 +157,10 @@ class TNormalizationController { virtual ~INormalizerComponent() {} + TNormalizerFullId GetNormalizerFullId() const { + return TNormalizerFullId(GetClassName(), UniqueDescription); + } + bool HasActiveTasks() const { return AtomicGet(ActiveTasksCount) > 0; } @@ -191,16 +215,7 @@ class TNormalizationController { } } - TConclusion> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastSavedNormalizerId()) - ("seq_id", GetSequentialId())("type", GetEnumSequentialId()); - auto result = DoInit(controller, txc); - if (!result.IsSuccess()) { - return result; - } - AtomicSet(ActiveTasksCount, result.GetResult().size()); - return result; - } + TConclusion> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc); private: virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0; @@ -215,7 +230,8 @@ class TNormalizationController { std::deque Normalizers; std::deque Counters; - THashSet FinishedRepairs; + std::set FinishedNormalizers; + std::map StartedNormalizers; YDB_READONLY_DEF(std::optional, LastSavedNormalizerId); private: INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer); @@ -231,8 +247,8 @@ class TNormalizationController { } void InitNormalizers(const TInitContext& ctx); - void UpdateControllerState(NIceDb::TNiceDb& db) const; - void AddRepairInfo(NIceDb::TNiceDb& db, const TString& info) const; + void OnNormalizerFinished(NIceDb::TNiceDb& db) const; + void AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& eventType, const TString& eventDescription) const; bool InitControllerState(NIceDb::TNiceDb& db); std::shared_ptr GetStoragesManager() const { diff --git a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp index d552b8182282..238638d9596a 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp @@ -41,7 +41,7 @@ class TNormalizerResult : public INormalizerChanges { sb << "];"; sb << "records_count:" << recordsCount; NIceDb::TNiceDb db(txc.DB); - normController.AddRepairInfo(db, sb); + normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", sb); } return true; }