Skip to content

Commit

Permalink
improve normalizers control in local db (#5343)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jun 9, 2024
1 parent cbcee71 commit 8d7c078
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 49 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
break;
}
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.UpdateControllerState(db);
Self->NormalizerController.OnNormalizerFinished(db);
Self->NormalizerController.SwitchNormalizer();
} else {
Self->NormalizerController.GetCounters().OnNormalizerFails();
Expand Down Expand Up @@ -335,7 +335,7 @@ bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&)

if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
NIceDb::TNiceDb db(txc.DB);
Self->NormalizerController.UpdateControllerState(db);
Self->NormalizerController.OnNormalizerFinished(db);
}
return true;
}
Expand Down
77 changes: 63 additions & 14 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ struct Schema : NIceDb::Schema {
PortionsTableId,
BackgroundSessionsTableId,
ShardingInfoTableId,
RepairsTableId
RepairsTableId,
NormalizersTableId,
NormalizerEventsTableId
};

enum class ETierTables: ui32 {
Expand Down Expand Up @@ -487,14 +489,26 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, VersionId, Snapshot, Logic>;
};

struct Repairs: Table<RepairsTableId> {
struct Identifier: Column<1, NScheme::NTypeIds::Utf8> {};
struct UniqueDescription: Column<2, NScheme::NTypeIds::Utf8> {};
struct Normalizers: Table<NormalizersTableId> {
struct ClassName: Column<1, NScheme::NTypeIds::Utf8> {};
struct Description: Column<2, NScheme::NTypeIds::Utf8> {};
struct Identifier: Column<3, NScheme::NTypeIds::Utf8> {};
struct Start: Column<4, NScheme::NTypeIds::Uint64> {};
struct Finish: Column<5, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<ClassName, Description, Identifier>;
using TColumns = TableColumns<ClassName, Description, Identifier, Start, Finish>;
};

struct NormalizerEvents: Table<NormalizerEventsTableId> {
struct NormalizerId: Column<1, NScheme::NTypeIds::Utf8> {};
struct EventId: Column<2, NScheme::NTypeIds::Utf8> {};
struct Instant: Column<3, NScheme::NTypeIds::Uint64> {};
struct Event: Column<4, NScheme::NTypeIds::Utf8> {};
struct EventType: Column<4, NScheme::NTypeIds::Utf8> {};
struct Description: Column<5, NScheme::NTypeIds::Utf8> {};

using TKey = TableKey<Identifier>;
using TColumns = TableColumns<Identifier, UniqueDescription, Instant, Event>;
using TKey = TableKey<NormalizerId, EventId>;
using TColumns = TableColumns<NormalizerId, EventId, Instant, EventType, Description>;
};

using TTables = SchemaTables<
Expand Down Expand Up @@ -529,7 +543,8 @@ struct Schema : NIceDb::Schema {
IndexPortions,
BackgroundSessions,
ShardingInfo,
Repairs
Normalizers,
NormalizerEvents
>;

//
Expand Down Expand Up @@ -572,6 +587,22 @@ struct Schema : NIceDb::Schema {
return false;
}

template <typename T>
static bool GetSpecialValueOpt(NIceDb::TNiceDb& db, EValueIds key, std::optional<T>& value) {
using TSource = std::conditional_t<std::is_integral_v<T> || std::is_enum_v<T>, Value::Digit, Value::Bytes>;

auto rowset = db.Table<Value>().Key((ui32)key).Select<TSource>();
if (rowset.IsReady()) {
if (rowset.IsValid()) {
value = T{ rowset.template GetValue<TSource>() };
} else {
value = {};
}
return true;
}
return false;
}

template<class TMessage>
static bool GetSpecialProtoValue(NIceDb::TNiceDb& db, EValueIds key, std::optional<TMessage>& value) {
auto rowset = db.Table<Value>().Key(ui32(key)).Select<Value::Bytes>();
Expand All @@ -584,13 +615,31 @@ struct Schema : NIceDb::Schema {
return false;
}

static void AddRepairEvent(NIceDb::TNiceDb& db, const TString& id, const TInstant instant, const TString& description, const TString& eventInfo) {
db.Table<Repairs>().Key(id)
static void AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& normalizerId, const TString& eventType, const TString& description) {
db.Table<NormalizerEvents>().Key(normalizerId, TGUID::CreateTimebased().AsUuidString())
.Update(
NIceDb::TUpdate<Repairs::UniqueDescription>(description),
NIceDb::TUpdate<Repairs::Instant>(instant.Seconds()),
NIceDb::TUpdate<Repairs::Event>(eventInfo))
;
NIceDb::TUpdate<NormalizerEvents::Instant>(TInstant::Now().MicroSeconds()),
NIceDb::TUpdate<NormalizerEvents::EventType>(eventType),
NIceDb::TUpdate<NormalizerEvents::Description>(description)
);
}

static void StartNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) {
db.Table<Normalizers>().Key(className, description, normalizerId)
.Update(
NIceDb::TUpdate<Normalizers::Start>(TInstant::Now().MicroSeconds())
);
}

static void RemoveNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) {
db.Table<Normalizers>().Key(className, description, normalizerId).Delete();
}

static void FinishNormalizer(NIceDb::TNiceDb& db, const TString& className, const TString& description, const TString& normalizerId) {
db.Table<Normalizers>().Key(className, description, normalizerId)
.Update(
NIceDb::TUpdate<Normalizers::Start>(TInstant::Now().MicroSeconds())
);
}

static void SaveSpecialValue(NIceDb::TNiceDb& db, EValueIds key, const TString& value) {
Expand Down
65 changes: 46 additions & 19 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,15 @@ void TTrivialNormalizerTask::Start(const TNormalizationController& /* controller
TActorContext::AsActorContext().Send(nCtx.GetShardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(Changes));
}

void TNormalizationController::AddRepairInfo(NIceDb::TNiceDb& db, const TString& info) const {
NColumnShard::Schema::AddRepairEvent(db, GetNormalizer()->GetUniqueId(), TInstant::Now(), GetNormalizer()->GetUniqueDescription(), "INFO:" + info);
void TNormalizationController::AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& eventType, const TString& eventDescription) const {
NColumnShard::Schema::AddNormalizerEvent(db, GetNormalizer()->GetUniqueId(), eventType, eventDescription);
}

void TNormalizationController::UpdateControllerState(NIceDb::TNiceDb& db) const {
void TNormalizationController::OnNormalizerFinished(NIceDb::TNiceDb& db) const {
if (auto seqId = GetNormalizer()->GetSequentialId()) {
NColumnShard::Schema::SaveSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, *seqId);
}
if (GetNormalizer()->GetIsRepair()) {
NColumnShard::Schema::AddRepairEvent(db, GetNormalizer()->GetUniqueId(), TInstant::Now(), GetNormalizer()->GetUniqueDescription(), "FINISHED");
}
NColumnShard::Schema::FinishNormalizer(db, GetNormalizer()->GetClassName(), GetNormalizer()->GetUniqueDescription(), GetNormalizer()->GetUniqueId());
}

void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
Expand All @@ -61,7 +59,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
if (HasAppData()) {
for (auto&& i : AppDataVerified().ColumnShardConfig.GetRepairs()) {
AFL_VERIFY(i.GetDescription())("error", "repair normalization have to has unique description");
if (FinishedRepairs.contains(i.GetDescription())) {
if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription());
} else {
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx)));
Expand All @@ -87,37 +85,66 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx)));
AFL_VERIFY(normalizer->GetEnumSequentialIdVerified() == nType);
AFL_VERIFY(lastRegisteredNormalizer <= nType)("current", ToString(nType))("last", ToString(lastRegisteredNormalizer));
normalizer->SetUniqueDescription(normalizer->GetClassName());
lastRegisteredNormalizer = nType;
}

for (auto&& i : Normalizers) {
auto it = StartedNormalizers.find(i->GetNormalizerFullId());
if (it != StartedNormalizers.end()) {
i->SetUniqueId(it->second);
}
if (!i->GetUniqueDescription()) {
i->SetUniqueDescription(i->GetClassName());
}
}
}

bool TNormalizationController::InitControllerState(NIceDb::TNiceDb& db) {
{
auto rowset = db.Table<NColumnShard::Schema::Repairs>().Select();
auto rowset = db.Table<NColumnShard::Schema::Normalizers>().Select();
if (!rowset.IsReady()) {
return false;
}
THashSet<TString> descriptions;
std::set<TNormalizerFullId> finished;
std::map<TNormalizerFullId, TString> started;
while (!rowset.EndOfSet()) {
if (rowset.GetValue<NColumnShard::Schema::Repairs::Event>() != "FINISHED") {
continue;
const TNormalizerFullId id(
rowset.GetValue<NColumnShard::Schema::Normalizers::ClassName>(),
rowset.GetValue<NColumnShard::Schema::Normalizers::Description>());
if (!rowset.HaveValue<NColumnShard::Schema::Normalizers::Finish>()) {
started.emplace(id, rowset.GetValue<NColumnShard::Schema::Normalizers::Identifier>());
} else {
finished.emplace(id);
}
descriptions.emplace(rowset.GetValue<NColumnShard::Schema::Repairs::UniqueDescription>());
if (!rowset.Next()) {
return false;
}
}
FinishedRepairs = descriptions;
FinishedNormalizers = finished;
StartedNormalizers = started;
}

ui64 lastNormalizerId;
if (NColumnShard::Schema::GetSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, lastNormalizerId)) {
LastSavedNormalizerId = lastNormalizerId;
} else {
LastSavedNormalizerId = {};
std::optional<ui64> lastNormalizerId;
if (!NColumnShard::Schema::GetSpecialValueOpt(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, lastNormalizerId)) {
return false;
}
LastSavedNormalizerId = lastNormalizerId.value_or(0);
return true;
}

NKikimr::TConclusion<std::vector<NKikimr::NOlap::INormalizerTask::TPtr>> TNormalizationController::INormalizerComponent::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastSavedNormalizerId())
("seq_id", GetSequentialId())("type", GetEnumSequentialId());
auto result = DoInit(controller, txc);
if (!result.IsSuccess()) {
return result;
}
NIceDb::TNiceDb db(txc.DB);
if (!controller.StartedNormalizers.contains(GetNormalizerFullId())) {
NColumnShard::Schema::StartNormalizer(db, GetClassName(), GetUniqueDescription(), UniqueId);
}
AtomicSet(ActiveTasksCount, result.GetResult().size());
return result;
}

}
42 changes: 29 additions & 13 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ class TNormalizationController {
}
};

class TNormalizerFullId {
private:
YDB_READONLY_DEF(TString, ClassName);
YDB_READONLY_DEF(TString, Description);
public:
bool operator<(const TNormalizerFullId& item) const {
if (ClassName == item.ClassName) {
return Description < item.Description;
}
return ClassName < item.ClassName;
}

TNormalizerFullId(const TString& className, const TString& description)
: ClassName(className)
, Description(description)
{

}
};

class INormalizerComponent {
private:
YDB_ACCESSOR(bool, IsRepair, false);
Expand All @@ -137,6 +157,10 @@ class TNormalizationController {

virtual ~INormalizerComponent() {}

TNormalizerFullId GetNormalizerFullId() const {
return TNormalizerFullId(GetClassName(), UniqueDescription);
}

bool HasActiveTasks() const {
return AtomicGet(ActiveTasksCount) > 0;
}
Expand Down Expand Up @@ -191,16 +215,7 @@ class TNormalizationController {
}
}

TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastSavedNormalizerId())
("seq_id", GetSequentialId())("type", GetEnumSequentialId());
auto result = DoInit(controller, txc);
if (!result.IsSuccess()) {
return result;
}
AtomicSet(ActiveTasksCount, result.GetResult().size());
return result;
}
TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);

private:
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
Expand All @@ -215,7 +230,8 @@ class TNormalizationController {

std::deque<INormalizerComponent::TPtr> Normalizers;
std::deque<TNormalizerCounters> Counters;
THashSet<TString> FinishedRepairs;
std::set<TNormalizerFullId> FinishedNormalizers;
std::map<TNormalizerFullId, TString> StartedNormalizers;
YDB_READONLY_DEF(std::optional<ui32>, LastSavedNormalizerId);
private:
INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer);
Expand All @@ -231,8 +247,8 @@ class TNormalizationController {
}

void InitNormalizers(const TInitContext& ctx);
void UpdateControllerState(NIceDb::TNiceDb& db) const;
void AddRepairInfo(NIceDb::TNiceDb& db, const TString& info) const;
void OnNormalizerFinished(NIceDb::TNiceDb& db) const;
void AddNormalizerEvent(NIceDb::TNiceDb& db, const TString& eventType, const TString& eventDescription) const;
bool InitControllerState(NIceDb::TNiceDb& db);

std::shared_ptr<IStoragesManager> GetStoragesManager() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TNormalizerResult : public INormalizerChanges {
sb << "];";
sb << "records_count:" << recordsCount;
NIceDb::TNiceDb db(txc.DB);
normController.AddRepairInfo(db, sb);
normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", sb);
}
return true;
}
Expand Down

0 comments on commit 8d7c078

Please sign in to comment.