Skip to content

Commit

Permalink
speed up compilation for request processing (#7900)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 16, 2024
1 parent bcac782 commit f6476d4
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>

#include <ydb/library/accessor/accessor.h>

#include <util/string/join.h>

namespace NKikimr::NOlap::NReader::NPlain {
Expand All @@ -16,6 +18,7 @@ class TIndexesSet {
private:
YDB_READONLY_DEF(std::vector<ui32>, IndexIds);
YDB_READONLY_DEF(std::set<ui32>, IndexIdsSet);

public:
TIndexesSet(const std::set<ui32>& indexIds)
: IndexIds(indexIds.begin(), indexIds.end())
Expand All @@ -24,8 +27,8 @@ class TIndexesSet {
}

TIndexesSet(const ui32& indexId)
: IndexIds({indexId})
, IndexIdsSet({indexId}) {
: IndexIds({ indexId })
, IndexIdsSet({ indexId }) {
}

ui32 GetIndexesCount() const {
Expand All @@ -37,84 +40,70 @@ class TIndexesSet {
}
};

class TColumnsSet {
private:
YDB_READONLY_DEF(std::set<ui32>, ColumnIds);
YDB_READONLY_DEF(std::set<TString>, ColumnNames);
std::vector<TString> ColumnNamesVector;
YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema);
ISnapshotSchema::TPtr FullReadSchema;
YDB_READONLY_DEF(ISnapshotSchema::TPtr, FilteredSchema);

void Rebuild();
class TColumnsSetIds {
protected:
std::set<ui32> ColumnIds;

public:
TColumnsSet() = default;
bool IsEmpty() const {
return ColumnIds.empty();
const std::set<ui32>& GetColumnIds() const {
return ColumnIds;
}

bool operator!() const {
return IsEmpty();
TString DebugString() const {
return JoinSeq(",", ColumnIds);
}

const std::vector<TString>& GetColumnNamesVector() const {
return ColumnNamesVector;
TColumnsSetIds(const std::set<ui32>& ids)
: ColumnIds(ids) {
}

ui32 GetColumnsCount() const {
return ColumnIds.size();
TColumnsSetIds() = default;
TColumnsSetIds(std::set<ui32>&& ids)
: ColumnIds(std::move(ids)) {
}

bool ColumnsOnly(const std::vector<std::string>& fieldNames) const;

std::shared_ptr<TColumnsSet> BuildSamePtr(const std::set<ui32>& columnIds) const {
return std::make_shared<TColumnsSet>(columnIds, FullReadSchema);
TColumnsSetIds(const std::vector<ui32>& ids)
: ColumnIds(ids.begin(), ids.end()) {
}

TColumnsSet(const std::set<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
: ColumnIds(columnIds)
, FullReadSchema(fullReadSchema)
{
AFL_VERIFY(!!FullReadSchema);
Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
Rebuild();
TColumnsSetIds operator+(const TColumnsSetIds& external) const {
TColumnsSetIds result = *this;
result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
return result;
}

TColumnsSet(const std::vector<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
: ColumnIds(columnIds.begin(), columnIds.end())
, FullReadSchema(fullReadSchema)
{
AFL_VERIFY(!!FullReadSchema);
Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
Rebuild();
TColumnsSetIds operator-(const TColumnsSetIds& external) const {
TColumnsSetIds result = *this;
for (auto&& i : external.ColumnIds) {
result.ColumnIds.erase(i);
}
return result;
}

const ISnapshotSchema& GetFilteredSchemaVerified() const {
AFL_VERIFY(FilteredSchema);
return *FilteredSchema;
bool IsEmpty() const {
return ColumnIds.empty();
}

const std::shared_ptr<ISnapshotSchema>& GetFilteredSchemaPtrVerified() const {
AFL_VERIFY(FilteredSchema);
return FilteredSchema;
bool operator!() const {
return IsEmpty();
}
ui32 GetColumnsCount() const {
return ColumnIds.size();
}

bool Contains(const std::shared_ptr<TColumnsSet>& columnsSet) const {
bool Contains(const std::shared_ptr<TColumnsSetIds>& columnsSet) const {
if (!columnsSet) {
return true;
}
return Contains(*columnsSet);
}

bool IsEqual(const std::shared_ptr<TColumnsSet>& columnsSet) const {
bool IsEqual(const std::shared_ptr<TColumnsSetIds>& columnsSet) const {
if (!columnsSet) {
return false;
}
return IsEqual(*columnsSet);
}

bool Contains(const TColumnsSet& columnsSet) const {
bool Contains(const TColumnsSetIds& columnsSet) const {
for (auto&& i : columnsSet.ColumnIds) {
if (!ColumnIds.contains(i)) {
return false;
Expand All @@ -123,7 +112,7 @@ class TColumnsSet {
return true;
}

bool Cross(const TColumnsSet& columnsSet) const {
bool Cross(const TColumnsSetIds& columnsSet) const {
for (auto&& i : columnsSet.ColumnIds) {
if (ColumnIds.contains(i)) {
return true;
Expand All @@ -132,7 +121,7 @@ class TColumnsSet {
return false;
}

std::set<ui32> Intersect(const TColumnsSet& columnsSet) const {
std::set<ui32> Intersect(const TColumnsSetIds& columnsSet) const {
std::set<ui32> result;
for (auto&& i : columnsSet.ColumnIds) {
if (ColumnIds.contains(i)) {
Expand All @@ -142,7 +131,7 @@ class TColumnsSet {
return result;
}

bool IsEqual(const TColumnsSet& columnsSet) const {
bool IsEqual(const TColumnsSetIds& columnsSet) const {
if (columnsSet.GetColumnIds().size() != ColumnIds.size()) {
return false;
}
Expand All @@ -157,6 +146,56 @@ class TColumnsSet {
}
return true;
}
};

class TColumnsSet: public TColumnsSetIds {
private:
using TBase = TColumnsSetIds;
YDB_READONLY_DEF(std::set<TString>, ColumnNames);
std::vector<TString> ColumnNamesVector;
YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema);
ISnapshotSchema::TPtr FullReadSchema;
YDB_READONLY_DEF(ISnapshotSchema::TPtr, FilteredSchema);

void Rebuild();

public:
TColumnsSet() = default;
const std::vector<TString>& GetColumnNamesVector() const {
return ColumnNamesVector;
}

bool ColumnsOnly(const std::vector<std::string>& fieldNames) const;

std::shared_ptr<TColumnsSet> BuildSamePtr(const std::set<ui32>& columnIds) const {
return std::make_shared<TColumnsSet>(columnIds, FullReadSchema);
}

TColumnsSet(const std::set<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
: TBase(columnIds)
, FullReadSchema(fullReadSchema) {
AFL_VERIFY(!!FullReadSchema);
Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
Rebuild();
}

TColumnsSet(const std::vector<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
: TBase(columnIds)
, FullReadSchema(fullReadSchema) {
AFL_VERIFY(!!FullReadSchema);
Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
Rebuild();
}

const ISnapshotSchema& GetFilteredSchemaVerified() const {
AFL_VERIFY(FilteredSchema);
return *FilteredSchema;
}

const std::shared_ptr<ISnapshotSchema>& GetFilteredSchemaPtrVerified() const {
AFL_VERIFY(FilteredSchema);
return FilteredSchema;
}

TString DebugString() const;

Expand All @@ -165,4 +204,4 @@ class TColumnsSet {
TColumnsSet operator-(const TColumnsSet& external) const;
};

}
} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_p
return result;
}

std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) const {
std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
const bool needSnapshots = !source->GetExclusiveIntervalOnly() || ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() ||
!source->IsSourceInMemory();
bool partialUsageByPK = false;
Expand Down Expand Up @@ -58,13 +58,17 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con
needShardingFilter = true;
}
}
if (auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]) {
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("SS", needSnapshots)("PK", partialUsageByPK)("IDX", useIndexes)("SHARDING", needShardingFilter)
// ("EXCL", source->GetExclusiveIntervalOnly())("MEM", source->IsSourceInMemory())("result", result->DebugString());
return result;
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
if (!result) {
result = BuildColumnsFetchingPlan(needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0] = result;
}
{
AFL_VERIFY(result);
if (*result) {
return *result;
} else {
std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this);
result->SetBranchName("FAKE");
result->AddStep(std::make_shared<TBuildFakeSpec>(source->GetRecordsCount()));
Expand All @@ -74,31 +78,32 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(con

class TColumnsAccumulator {
private:
TColumnsSet FetchingReadyColumns;
TColumnsSet AssemblerReadyColumns;
std::shared_ptr<TColumnsSet> GuaranteeNotOptional;
TColumnsSetIds FetchingReadyColumns;
TColumnsSetIds AssemblerReadyColumns;
ISnapshotSchema::TPtr FullSchema;
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;

public:
TColumnsAccumulator(const std::shared_ptr<TColumnsSet>& guaranteeNotOptional)
: GuaranteeNotOptional(guaranteeNotOptional) {
TColumnsAccumulator(const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema)
: FullSchema(fullSchema)
, GuaranteeNotOptional(guaranteeNotOptional) {
}

bool AddFetchingStep(TFetchingScript& script, const TColumnsSet& columns, const EStageFeaturesIndexes& stage) {
auto actualColumns = columns - FetchingReadyColumns;
FetchingReadyColumns = FetchingReadyColumns + columns;
bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes& stage) {
auto actualColumns = (TColumnsSetIds)columns - FetchingReadyColumns;
FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns;
if (!actualColumns.IsEmpty()) {
auto actualSet = std::make_shared<TColumnsSet>(actualColumns);
script.AddStep(std::make_shared<TAllocateMemoryStep>(actualSet, stage));
script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualSet));
script.AddStep(std::make_shared<TAllocateMemoryStep>(actualColumns, stage));
script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
return true;
}
return false;
}
bool AddAssembleStep(TFetchingScript& script, const TColumnsSet& columns, const TString& purposeId, const bool optional) {
auto actualColumns = columns - AssemblerReadyColumns;
bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const bool optional) {
auto actualColumns = (TColumnsSetIds)columns - AssemblerReadyColumns;
AssemblerReadyColumns = AssemblerReadyColumns + columns;
if (!actualColumns.IsEmpty()) {
auto actualSet = std::make_shared<TColumnsSet>(actualColumns);
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (optional) {
const auto notOptionalColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notOptionalColumnIds.size()) {
Expand Down Expand Up @@ -127,10 +132,10 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
result->AddStep(std::make_shared<TApplyIndexStep>(IndexChecker));
}
bool hasFilterSharding = false;
TColumnsAccumulator acc(MergeColumns);
TColumnsAccumulator acc(MergeColumns, ReadMetadata->GetResultSchema());
if (needFilterSharding && !ShardingColumns->IsEmpty()) {
hasFilterSharding = true;
TColumnsSet columnsFetch = *ShardingColumns;
TColumnsSetIds columnsFetch = *ShardingColumns;
if (!exclusiveSource) {
columnsFetch = columnsFetch + *PKColumns + *SpecColumns;
}
Expand All @@ -140,7 +145,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
}
if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) {
result->SetBranchName("simple");
TColumnsSet columnsFetch = *FFColumns;
TColumnsSetIds columnsFetch = *FFColumns;
if (needFilterDeletion) {
columnsFetch = columnsFetch + *DeletionColumns;
}
Expand Down Expand Up @@ -334,15 +339,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), readSchema);
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);

const auto GetBit = [](const ui32 val, const ui32 pos) -> ui32 {
return (val & (1 << pos)) ? 1 : 0;
};

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
for (ui32 i = 0; i < (1 << 6); ++i) {
CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)] =
BuildColumnsFetchingPlan(GetBit(i, 0), GetBit(i, 1), GetBit(i, 2), GetBit(i, 3), GetBit(i, 4), GetBit(i, 5));
}
}

TString TSpecialReadContext::DebugString() const {
Expand All @@ -363,8 +360,8 @@ TString TSpecialReadContext::ProfileDebugString() const {

for (ui32 i = 0; i < (1 << 6); ++i) {
auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
if (script) {
sb << script->DebugString() << ";";
if (script && *script) {
sb << (*script)->DebugString() << ";";
}
}
return sb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class TSpecialReadContext {
std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource,
const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
std::array<std::array<std::array<std::array<std::array<std::array<std::shared_ptr<TFetchingScript>, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
std::array<std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>, 2>
CacheFetchingScripts;

public:
static const inline ui64 DefaultRejectMemoryIntervalLimit = TGlobalLimits::DefaultRejectMemoryIntervalLimit;
Expand Down Expand Up @@ -80,7 +81,7 @@ class TSpecialReadContext {

TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext);

std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) const;
std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source);
};

}
Loading

0 comments on commit f6476d4

Please sign in to comment.