Skip to content

Commit

Permalink
Fix memory usage for heavy normalizers (ydb-platform#2204)
Browse files Browse the repository at this point in the history
Co-authored-by: nsofya <nsofya@yandex.ru>
  • Loading branch information
2 people authored and ivanmorozov333 committed Feb 29, 2024
1 parent 18848f2 commit b4e9dfe
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ namespace NKikimr::NOlap {
class TNormalizationContext {
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
public:
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
ResourcesGuard = rg;
}
};

class TNormalizationController;
Expand Down Expand Up @@ -117,7 +122,7 @@ namespace NKikimr::NOlap {
TString DebugString() const {
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
}

const INormalizerComponent::TPtr& GetNormalizer() const;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
}

public:
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
: Blobs(std::move(blobs))
, Chunks(std::move(chunks))
, NormContext(nCtx)
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
private:
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
TDataContainer Portions;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;
protected:
virtual bool DoExecute() override {
Y_ABORT_UNLESS(!Schemas.empty());
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
Y_ABORT_UNLESS(!Schemas->empty());
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());

for (auto&& portionInfo : Portions) {
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
for (auto&& i : portionInfo->Records) {
auto blobIt = Blobs.find(i.BlobRange);
Expand All @@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
}

public:
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Blobs(std::move(blobs))
, Portions(std::move(portions))
, Schemas(std::move(schemas))
, Schemas(schemas)
, NormContext(nCtx)
{}

Expand Down Expand Up @@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}

THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());

{
Expand All @@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
auto portionMeta = loadContext.GetPortionMeta();
if (it == portions.end()) {
Y_ABORT_UNLESS(portion.Records.empty());
schemas[portion.GetPortionId()] = currentSchema;
(*schemas)[portion.GetPortionId()] = currentSchema;
auto portionNew = std::make_shared<TPortionInfo>(portion);
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
it = portions.emplace(portion.GetPortion(), portionNew).first;
Expand Down Expand Up @@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}
++brokenPortioncCount;
package.emplace_back(portion.second);
if (package.size() == 100) {
if (package.size() == 1000) {
std::vector<std::shared_ptr<TPortionInfo>> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/normalizer/portion/normalizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
typename TConveyorTask::TDataContainer Data;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;

public:
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: TBase(actions, "CS::NORMALIZER")
, Data(std::move(data))
, Schemas(std::move(schemas))
Expand All @@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
Y_UNUSED(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
NormContext.SetResourcesGuard(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
}

Expand All @@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
template <class TConveyorTask>
class TPortionsNormalizerTask : public INormalizerTask {
typename TConveyorTask::TDataContainer Package;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
public:
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
: Package(std::move(package))
{}

TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Package(std::move(package))
, Schemas(schemas)
{}
Expand All @@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
}
};
}

0 comments on commit b4e9dfe

Please sign in to comment.