Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

accessors memory control for scan #11792

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ enum class EMemType {
};

enum class EStageFeaturesIndexes {
Filter = 0,
Fetching = 1,
Merge = 2
Accessors = 0,
Filter = 1,
Fetching = 2,
Merge = 3
};

class TIndexesSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
if (source->NeedAccessorsFetching()) {
if (!AskAccumulatorsScript) {
AskAccumulatorsScript = std::make_shared<TFetchingScript>(*this);
AskAccumulatorsScript->AddStep(std::make_shared<TPortionAccessorFetchingStep>());
if (ui64 size = source->PredictAccessorsMemory()) {
AskAccumulatorsScript->AddStep<TAllocateMemoryStep>(size, EStageFeaturesIndexes::Accessors);
}
AskAccumulatorsScript->AddStep<TPortionAccessorFetchingStep>();
}
AskAccumulatorsScript->AddStep<TDetectInMem>(*FFColumns);
return AskAccumulatorsScript;
Expand All @@ -54,8 +57,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
}
}
{
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0]
[useIndexes ? 1 : 0][needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
if (!result) {
TGuard<TMutex> wg(Mutex);
result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
Expand Down Expand Up @@ -274,11 +277,11 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c

TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
: CommonContext(commonContext) {

ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
Y_ABORT_UNLESS(ReadMetadata);
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);

double kffAccessors = 0.01;
double kffFilter = 0.45;
double kffFetching = 0.45;
double kffMerge = 0.10;
Expand All @@ -287,15 +290,19 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
stagePrefix = "EF";
kffFilter = 0.7;
kffFetching = 0.15;
kffMerge = 0.15;
kffMerge = 0.14;
kffAccessors = 0.01;
} else {
stagePrefix = "FO";
kffFilter = 0.1;
kffFetching = 0.75;
kffMerge = 0.15;
kffMerge = 0.14;
kffAccessors = 0.01;
}

std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
Expand All @@ -304,8 +311,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());

auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

ui64 size = 0;
ui64 size = PredefinedSize.value_or(0);
for (auto&& i : Packs) {
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class TAllocateMemoryStep: public IFetchingStep {
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
const EStageFeaturesIndexes StageIndex;
std::optional<ui64> PredefinedSize;

protected:
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
Expand All @@ -238,6 +239,7 @@ class TAllocateMemoryStep: public IFetchingStep {
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
Expand Down Expand Up @@ -266,6 +268,12 @@ class TAllocateMemoryStep: public IFetchingStep {
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}

TAllocateMemoryStep(const ui64 size, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex)
, PredefinedSize(size) {
}
};

class TDetectInMemStep: public IFetchingStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class IDataSource {
public:
virtual bool NeedAccessorsForRead() const = 0;
virtual bool NeedAccessorsFetching() const = 0;

virtual ui64 PredictAccessorsMemory() const = 0;
bool StartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
return DoStartFetchingAccessor(sourcePtr, step);
}
Expand Down Expand Up @@ -318,6 +318,10 @@ class TPortionDataSource: public IDataSource {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;

public:
virtual ui64 PredictAccessorsMemory() const override {
return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
}

virtual bool NeedAccessorsForRead() const override {
return true;
}
Expand Down Expand Up @@ -430,6 +434,10 @@ class TCommittedDataSource: public IDataSource {
}

public:
virtual ui64 PredictAccessorsMemory() const override {
return 0;
}

virtual bool NeedAccessorsForRead() const override {
return false;
}
Expand Down
Loading