Skip to content

Commit

Permalink
Merge d3d334d into 739d073
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 20, 2024
2 parents 739d073 + d3d334d commit 828b6cf
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ enum class EMemType {
};

enum class EStageFeaturesIndexes {
Filter = 0,
Fetching = 1,
Merge = 2
Accessors = 0,
Filter = 1,
Fetching = 2,
Merge = 3
};

class TIndexesSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
if (source->NeedAccessorsFetching()) {
if (!AskAccumulatorsScript) {
AskAccumulatorsScript = std::make_shared<TFetchingScript>(*this);
AskAccumulatorsScript->AddStep(std::make_shared<TPortionAccessorFetchingStep>());
AskAccumulatorsScript->AddStep<TAllocateMemoryStep>();
AskAccumulatorsScript->AddStep<TPortionAccessorFetchingStep>();
}
AskAccumulatorsScript->AddStep<TDetectInMem>(*FFColumns);
return AskAccumulatorsScript;
Expand All @@ -54,8 +55,8 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
}
}
{
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0]
[useIndexes ? 1 : 0][needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
if (!result) {
TGuard<TMutex> wg(Mutex);
result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
Expand Down Expand Up @@ -274,11 +275,11 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c

TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
: CommonContext(commonContext) {

ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
Y_ABORT_UNLESS(ReadMetadata);
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);

double kffAccessors = 0.01;
double kffFilter = 0.45;
double kffFetching = 0.45;
double kffMerge = 0.10;
Expand All @@ -287,15 +288,19 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
stagePrefix = "EF";
kffFilter = 0.7;
kffFetching = 0.15;
kffMerge = 0.15;
kffMerge = 0.14;
kffAccessors = 0.01;
} else {
stagePrefix = "FO";
kffFilter = 0.1;
kffFetching = 0.75;
kffMerge = 0.15;
kffMerge = 0.14;
kffAccessors = 0.01;
}

std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit),
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
Expand All @@ -304,8 +309,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());

auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

ui64 size = 0;
ui64 size = PredefinedSize.value_or(0);
for (auto&& i : Packs) {
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class TAllocateMemoryStep: public IFetchingStep {
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
const EStageFeaturesIndexes StageIndex;
std::optional<ui64> PredefinedSize;

protected:
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
Expand All @@ -238,6 +239,7 @@ class TAllocateMemoryStep: public IFetchingStep {
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
Expand Down Expand Up @@ -266,6 +268,12 @@ class TAllocateMemoryStep: public IFetchingStep {
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}

TAllocateMemoryStep(const ui64 size, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex)
, PredefinedSize(size) {
}
};

class TDetectInMemStep: public IFetchingStep {
Expand Down

0 comments on commit 828b6cf

Please sign in to comment.