Skip to content

Commit

Permalink
Leak bs normalizer (#11682)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 19, 2024
1 parent c5d16f6 commit 481ccad
Show file tree
Hide file tree
Showing 23 changed files with 608 additions and 180 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}
}
{
NOlap::TNormalizationController::TInitContext initCtx(Self->Info());
NOlap::TNormalizationController::TInitContext initCtx(Self->Info(), Self->TabletID(), Self->SelfId());
Self->NormalizerController.InitNormalizers(initCtx);
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace NKikimr::NOlap {

TNormalizationController::INormalizerComponent::TPtr TNormalizationController::RegisterNormalizer(INormalizerComponent::TPtr normalizer) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
AFL_VERIFY(normalizer);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
Counters.emplace_back(normalizer->GetClassName());
Normalizers.emplace_back(normalizer);
return normalizer;
Expand Down Expand Up @@ -74,7 +74,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription());
} else {
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx)));
auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx);
AFL_VERIFY(component)("class_name", i.GetClassName());
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release()));
normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription());
}
}
Expand Down
68 changes: 49 additions & 19 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#pragma once

#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/library/accessor/accessor.h>

#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>

#include <library/cpp/object_factory/object_factory.h>

namespace NKikimr::NIceDb {
class TNiceDb;
class TNiceDb;
}

namespace NKikimr::NOlap {
Expand All @@ -21,6 +22,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr StartedCount;
NMonitoring::TDynamicCounters::TCounterPtr FinishedCount;
NMonitoring::TDynamicCounters::TCounterPtr FailedCount;

public:
TNormalizerCounters(const TString& normalizerName)
: TBase("Normalizer") {
Expand Down Expand Up @@ -49,7 +51,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
}
};

enum class ENormalizerSequentialId: ui32 {
enum class ENormalizerSequentialId : ui32 {
Granules = 1,
Chunks,
DeprecatedPortionsCleaner,
Expand All @@ -74,27 +76,29 @@ class TNormalizationContext {
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
YDB_ACCESSOR_DEF(TActorId, ShardActor);
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

public:
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
ResourcesGuard = rg;
}
};


class TNormalizationController;

class INormalizerTask {
public:
using TPtr = std::shared_ptr<INormalizerTask>;
virtual ~INormalizerTask() {}
virtual ~INormalizerTask() {
}

virtual void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) = 0;
};

class INormalizerChanges {
public:
using TPtr = std::shared_ptr<INormalizerChanges>;
virtual ~INormalizerChanges() {}
virtual ~INormalizerChanges() {
}

virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normalizationContext) const = 0;
virtual void ApplyOnComplete(const TNormalizationController& normalizationContext) const {
Expand All @@ -106,6 +110,7 @@ class INormalizerChanges {

class TTrivialNormalizerTask: public INormalizerTask {
INormalizerChanges::TPtr Changes;

public:
TTrivialNormalizerTask(const INormalizerChanges::TPtr& changes)
: Changes(changes) {
Expand All @@ -118,10 +123,24 @@ class TTrivialNormalizerTask: public INormalizerTask {
class TNormalizationController {
public:
class TInitContext {
private:
TIntrusiveConstPtr<TTabletStorageInfo> StorageInfo;
const ui64 TabletId;
const NActors::TActorId TabletActorId;

public:
TInitContext(TTabletStorageInfo* info)
: StorageInfo(info) {
TInitContext(TTabletStorageInfo* info, const ui64 tabletId, const NActors::TActorId& actorId)
: StorageInfo(info)
, TabletId(tabletId)
, TabletActorId(actorId) {
}

ui64 GetTabletId() const {
return TabletId;
}

const NActors::TActorId& GetTabletActorId() const {
return TabletActorId;
}

TIntrusiveConstPtr<TTabletStorageInfo> GetStorageInfo() const {
Expand All @@ -133,6 +152,7 @@ class TNormalizationController {
private:
YDB_READONLY_DEF(TString, ClassName);
YDB_READONLY_DEF(TString, Description);

public:
bool operator<(const TNormalizerFullId& item) const {
if (ClassName == item.ClassName) {
Expand All @@ -143,9 +163,7 @@ class TNormalizationController {

TNormalizerFullId(const TString& className, const TString& description)
: ClassName(className)
, Description(description)
{

, Description(description) {
}
};

Expand All @@ -154,18 +172,26 @@ class TNormalizationController {
YDB_ACCESSOR(bool, IsRepair, false);
YDB_ACCESSOR_DEF(TString, UniqueDescription);
YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString());

virtual TString DoDebugString() const {
return "";
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const = 0;

protected:
const ui64 TabletId;
const NActors::TActorId TabletActorId;

public:
using TPtr = std::shared_ptr<INormalizerComponent>;
using TFactory = NObjectFactory::TParametrizedObjectFactory<INormalizerComponent, TString, TInitContext>;

virtual ~INormalizerComponent() {}
virtual ~INormalizerComponent() = default;
INormalizerComponent(const TInitContext& context)
: TabletId(context.GetTabletId())
, TabletActorId(context.GetTabletActorId()) {
}

TNormalizerFullId GetNormalizerFullId() const {
return TNormalizerFullId(GetClassName(), UniqueDescription);
Expand Down Expand Up @@ -222,10 +248,12 @@ class TNormalizationController {
}
}

TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);
TConclusion<std::vector<INormalizerTask::TPtr>> Init(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);

private:
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;

TAtomic ActiveTasksCount = 0;
};
Expand All @@ -240,11 +268,13 @@ class TNormalizationController {
std::set<TNormalizerFullId> FinishedNormalizers;
std::map<TNormalizerFullId, TString> StartedNormalizers;
YDB_READONLY_DEF(std::optional<ui32>, LastSavedNormalizerId);

private:
INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer);

public:
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager, const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager,
const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
: StoragesManager(storagesManager)
, TaskSubscription("CS::NORMALIZER", counters) {
}
Expand All @@ -265,12 +295,12 @@ class TNormalizationController {

TString DebugString() const {
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
}

const INormalizerComponent::TPtr& GetNormalizer() const;
bool IsNormalizationFinished() const;
bool SwitchNormalizer();
const TNormalizerCounters& GetCounters() const;
};
}
} // namespace NKikimr::NOlap
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/normalizer/granule/clean_granule.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return ::ToString(ENormalizerSequentialId::CleanGranuleId);
}

private:
class TNormalizerResult;

static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(GetClassNameStatic());

public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::CleanGranuleId;
Expand All @@ -25,10 +29,12 @@ class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerCom
return GetClassNameStatic();
}

TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap
20 changes: 13 additions & 7 deletions ydb/core/tx/columnshard/normalizer/granule/normalizer.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TGranulesNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return ::ToString(ENormalizerSequentialId::Granules);
}

private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator = INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(
GetClassNameStatic());
static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(GetClassNameStatic());

public:
TGranulesNormalizer(const TNormalizationController::TInitContext&) {
TGranulesNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
Expand All @@ -28,7 +33,8 @@ class TGranulesNormalizer: public TNormalizationController::INormalizerComponent
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap
15 changes: 10 additions & 5 deletions ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap::NInsertionDedup {

class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return "CleanInsertionDedup";
}

private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());

public:
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
Expand All @@ -29,7 +33,8 @@ class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerCo
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap::NInsertionDedup
Loading

0 comments on commit 481ccad

Please sign in to comment.