Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory usage for heavy normalizers #2204

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ namespace NKikimr::NOlap {
class TNormalizationContext {
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
public:
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
ResourcesGuard = rg;
}
};

class TNormalizationController;
Expand Down Expand Up @@ -117,7 +122,7 @@ namespace NKikimr::NOlap {
TString DebugString() const {
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
}

const INormalizerComponent::TPtr& GetNormalizer() const;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
}

public:
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
: Blobs(std::move(blobs))
, Chunks(std::move(chunks))
, NormContext(nCtx)
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
private:
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
TDataContainer Portions;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;
protected:
virtual bool DoExecute() override {
Y_ABORT_UNLESS(!Schemas.empty());
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
Y_ABORT_UNLESS(!Schemas->empty());
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());

for (auto&& portionInfo : Portions) {
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
for (auto&& i : portionInfo->Records) {
auto blobIt = Blobs.find(i.BlobRange);
Expand All @@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
}

public:
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Blobs(std::move(blobs))
, Portions(std::move(portions))
, Schemas(std::move(schemas))
, Schemas(schemas)
, NormContext(nCtx)
{}

Expand Down Expand Up @@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}

THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());

{
Expand All @@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
auto portionMeta = loadContext.GetPortionMeta();
if (it == portions.end()) {
Y_ABORT_UNLESS(portion.Records.empty());
schemas[portion.GetPortionId()] = currentSchema;
(*schemas)[portion.GetPortionId()] = currentSchema;
auto portionNew = std::make_shared<TPortionInfo>(portion);
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
it = portions.emplace(portion.GetPortion(), portionNew).first;
Expand Down Expand Up @@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}
++brokenPortioncCount;
package.emplace_back(portion.second);
if (package.size() == 100) {
if (package.size() == 1000) {
std::vector<std::shared_ptr<TPortionInfo>> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/normalizer/portion/normalizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
typename TConveyorTask::TDataContainer Data;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;

public:
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: TBase(actions, "CS::NORMALIZER")
, Data(std::move(data))
, Schemas(std::move(schemas))
Expand All @@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
Y_UNUSED(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
NormContext.SetResourcesGuard(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
}

Expand All @@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
template <class TConveyorTask>
class TPortionsNormalizerTask : public INormalizerTask {
typename TConveyorTask::TDataContainer Package;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
public:
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
: Package(std::move(package))
{}

TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Package(std::move(package))
, Schemas(schemas)
{}
Expand All @@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
}
};
}
Loading