Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leak bs normalizer #11682

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}
}
{
NOlap::TNormalizationController::TInitContext initCtx(Self->Info());
NOlap::TNormalizationController::TInitContext initCtx(Self->Info(), Self->TabletID(), Self->SelfId());
Self->NormalizerController.InitNormalizers(initCtx);
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace NKikimr::NOlap {

TNormalizationController::INormalizerComponent::TPtr TNormalizationController::RegisterNormalizer(INormalizerComponent::TPtr normalizer) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
AFL_VERIFY(normalizer);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString());
Counters.emplace_back(normalizer->GetClassName());
Normalizers.emplace_back(normalizer);
return normalizer;
Expand Down Expand Up @@ -74,7 +74,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription());
} else {
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx)));
auto component = INormalizerComponent::TFactory::MakeHolder(i.GetClassName(), ctx);
AFL_VERIFY(component)("class_name", i.GetClassName());
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release()));
normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription());
}
}
Expand Down
68 changes: 49 additions & 19 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#pragma once

#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/library/accessor/accessor.h>

#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>

#include <library/cpp/object_factory/object_factory.h>

namespace NKikimr::NIceDb {
class TNiceDb;
class TNiceDb;
}

namespace NKikimr::NOlap {
Expand All @@ -21,6 +22,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr StartedCount;
NMonitoring::TDynamicCounters::TCounterPtr FinishedCount;
NMonitoring::TDynamicCounters::TCounterPtr FailedCount;

public:
TNormalizerCounters(const TString& normalizerName)
: TBase("Normalizer") {
Expand Down Expand Up @@ -49,7 +51,7 @@ class TNormalizerCounters: public NColumnShard::TCommonCountersOwner {
}
};

enum class ENormalizerSequentialId: ui32 {
enum class ENormalizerSequentialId : ui32 {
Granules = 1,
Chunks,
DeprecatedPortionsCleaner,
Expand All @@ -74,27 +76,29 @@ class TNormalizationContext {
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
YDB_ACCESSOR_DEF(TActorId, ShardActor);
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

public:
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
ResourcesGuard = rg;
}
};


class TNormalizationController;

class INormalizerTask {
public:
using TPtr = std::shared_ptr<INormalizerTask>;
virtual ~INormalizerTask() {}
virtual ~INormalizerTask() {
}

virtual void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) = 0;
};

class INormalizerChanges {
public:
using TPtr = std::shared_ptr<INormalizerChanges>;
virtual ~INormalizerChanges() {}
virtual ~INormalizerChanges() {
}

virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normalizationContext) const = 0;
virtual void ApplyOnComplete(const TNormalizationController& normalizationContext) const {
Expand All @@ -106,6 +110,7 @@ class INormalizerChanges {

class TTrivialNormalizerTask: public INormalizerTask {
INormalizerChanges::TPtr Changes;

public:
TTrivialNormalizerTask(const INormalizerChanges::TPtr& changes)
: Changes(changes) {
Expand All @@ -118,10 +123,24 @@ class TTrivialNormalizerTask: public INormalizerTask {
class TNormalizationController {
public:
class TInitContext {
private:
TIntrusiveConstPtr<TTabletStorageInfo> StorageInfo;
const ui64 TabletId;
const NActors::TActorId TabletActorId;

public:
TInitContext(TTabletStorageInfo* info)
: StorageInfo(info) {
TInitContext(TTabletStorageInfo* info, const ui64 tabletId, const NActors::TActorId& actorId)
: StorageInfo(info)
, TabletId(tabletId)
, TabletActorId(actorId) {
}

ui64 GetTabletId() const {
return TabletId;
}

const NActors::TActorId& GetTabletActorId() const {
return TabletActorId;
}

TIntrusiveConstPtr<TTabletStorageInfo> GetStorageInfo() const {
Expand All @@ -133,6 +152,7 @@ class TNormalizationController {
private:
YDB_READONLY_DEF(TString, ClassName);
YDB_READONLY_DEF(TString, Description);

public:
bool operator<(const TNormalizerFullId& item) const {
if (ClassName == item.ClassName) {
Expand All @@ -143,9 +163,7 @@ class TNormalizationController {

TNormalizerFullId(const TString& className, const TString& description)
: ClassName(className)
, Description(description)
{

, Description(description) {
}
};

Expand All @@ -154,18 +172,26 @@ class TNormalizationController {
YDB_ACCESSOR(bool, IsRepair, false);
YDB_ACCESSOR_DEF(TString, UniqueDescription);
YDB_ACCESSOR(TString, UniqueId, TGUID::CreateTimebased().AsUuidString());

virtual TString DoDebugString() const {
return "";
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const = 0;

protected:
const ui64 TabletId;
const NActors::TActorId TabletActorId;

public:
using TPtr = std::shared_ptr<INormalizerComponent>;
using TFactory = NObjectFactory::TParametrizedObjectFactory<INormalizerComponent, TString, TInitContext>;

virtual ~INormalizerComponent() {}
virtual ~INormalizerComponent() = default;
INormalizerComponent(const TInitContext& context)
: TabletId(context.GetTabletId())
, TabletActorId(context.GetTabletActorId()) {
}

TNormalizerFullId GetNormalizerFullId() const {
return TNormalizerFullId(GetClassName(), UniqueDescription);
Expand Down Expand Up @@ -222,10 +248,12 @@ class TNormalizationController {
}
}

TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);
TConclusion<std::vector<INormalizerTask::TPtr>> Init(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc);

private:
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) = 0;

TAtomic ActiveTasksCount = 0;
};
Expand All @@ -240,11 +268,13 @@ class TNormalizationController {
std::set<TNormalizerFullId> FinishedNormalizers;
std::map<TNormalizerFullId, TString> StartedNormalizers;
YDB_READONLY_DEF(std::optional<ui32>, LastSavedNormalizerId);

private:
INormalizerComponent::TPtr RegisterNormalizer(INormalizerComponent::TPtr normalizer);

public:
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager, const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
TNormalizationController(std::shared_ptr<IStoragesManager> storagesManager,
const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>& counters)
: StoragesManager(storagesManager)
, TaskSubscription("CS::NORMALIZER", counters) {
}
Expand All @@ -265,12 +295,12 @@ class TNormalizationController {

TString DebugString() const {
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
<< ";current_normalizer=" << (Normalizers.size() ? Normalizers.front()->DebugString() : "NO_DATA");
}

const INormalizerComponent::TPtr& GetNormalizer() const;
bool IsNormalizationFinished() const;
bool SwitchNormalizer();
const TNormalizerCounters& GetCounters() const;
};
}
} // namespace NKikimr::NOlap
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/normalizer/granule/clean_granule.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return ::ToString(ENormalizerSequentialId::CleanGranuleId);
}

private:
class TNormalizerResult;

static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(GetClassNameStatic());

public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::CleanGranuleId;
Expand All @@ -25,10 +29,12 @@ class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerCom
return GetClassNameStatic();
}

TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap
20 changes: 13 additions & 7 deletions ydb/core/tx/columnshard/normalizer/granule/normalizer.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TGranulesNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return ::ToString(ENormalizerSequentialId::Granules);
}

private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator = INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(
GetClassNameStatic());
static const inline INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TGranulesNormalizer>(GetClassNameStatic());

public:
TGranulesNormalizer(const TNormalizationController::TInitContext&) {
TGranulesNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
Expand All @@ -28,7 +33,8 @@ class TGranulesNormalizer: public TNormalizationController::INormalizerComponent
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap
15 changes: 10 additions & 5 deletions ydb/core/tx/columnshard/normalizer/insert_table/broken_dedup.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap::NInsertionDedup {

class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
private:
using TBase = TNormalizationController::INormalizerComponent;

public:
static TString GetClassNameStatic() {
return "CleanInsertionDedup";
}

private:
class TNormalizerResult;

static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());

public:
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext& context)
: TBase(context) {
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
Expand All @@ -29,7 +33,8 @@ class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerCo
return GetClassNameStatic();
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
} // namespace NKikimr::NOlap::NInsertionDedup
Loading
Loading