Skip to content

Commit

Permalink
Merge 7dd47e2 into ec89d22
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Feb 1, 2024
2 parents ec89d22 + 7dd47e2 commit 2a66675
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 27 deletions.
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf
return SortedRanges.empty();
}

bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
for (auto&& i : SortedRanges) {
if (i.IsPortionInPartialUsage(start, end, indexInfo)) {
return true;
}
}
return false;
}

TPKRangesFilter::TPKRangesFilter(const bool reverse)
: ReverseFlag(reverse)
{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/predicate/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TPKRangesFilter {
}

bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;

NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

Expand Down
31 changes: 31 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo
return true;
}

bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
bool startUsage = false;
bool endUsage = false;
if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) {
AFL_VERIFY(from->Size() <= start.Size());
if (PredicateFrom.IsInclude()) {
startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size()));
} else {
startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size()));
}
} else {
startUsage = true;
}

if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) {
AFL_VERIFY(to->Size() <= end.Size());
if (PredicateTo.IsInclude()) {
endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size()));
} else {
endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size()));
}
} else {
endUsage = true;
}

// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString())
// ("start_usage", startUsage)("end_usage", endUsage);

return endUsage || startUsage;
}

std::optional<NKikimr::NOlap::TPKRangeFilter> TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) {
if (!from.CrossRanges(to)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/predicate/range.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TPKRangeFilter {
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;

std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;
TString DebugString() const;
Expand Down
41 changes: 31 additions & 10 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_p

std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const {
const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax();
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0];
const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo());
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0];
if (!result) {
return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake");
}
return result;
}

std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const {
std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>();
std::shared_ptr<IFetchingStep> current = result;
const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount();
if (!!IndexChecker) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker));
}
if (!EFColumns->GetColumnsCount()) {
if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) {
TColumnsSet columnsFetch = *FFColumns;
if (needSnapshots) {
columnsFetch = columnsFetch + *SpecColumns;
Expand All @@ -52,6 +54,9 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
if (needSnapshots || FFColumns->Contains(SpecColumns)) {
columnsFetch = columnsFetch + *SpecColumns;
}
if (partialUsageByPredicate) {
columnsFetch = columnsFetch + *PredicateColumns;
}
AFL_VERIFY(columnsFetch.GetColumnsCount());
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"));

Expand All @@ -60,9 +65,13 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
columnsFetch = columnsFetch - *SpecColumns;
}
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
if (partialUsageByPredicate) {
current = current->AttachNext(std::make_shared<TAssemblerStep>(PredicateColumns));
current = current->AttachNext(std::make_shared<TPredicateFilter>());
columnsFetch = columnsFetch - *PredicateColumns;
}
if (columnsFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
}
for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
if (!i->IsFilterOnly()) {
Expand All @@ -84,7 +93,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
}
current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns));
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
if (partialUsageByPredicate) {
current = current->AttachNext(std::make_shared<TPredicateFilter>());
}
const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns;
Expand Down Expand Up @@ -114,6 +123,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
if (predicateColumns.size()) {
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema);
} else {
PredicateColumns = std::make_shared<TColumnsSet>();
}
}
{
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
Expand Down Expand Up @@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false);
CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true);
CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false);
CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true);
CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false);
CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false);
CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false);
CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false);
CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true);
CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true);
CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true);
CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true);
}

}
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TSpecialReadContext {
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
Expand All @@ -25,8 +26,8 @@ class TSpecialReadContext {
std::shared_ptr<TColumnsSet> PKFFColumns;
std::shared_ptr<TColumnsSet> EFPKColumns;
std::shared_ptr<TColumnsSet> FFMinusEFColumns;
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const;
std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts;
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const;
std::array<std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2>, 2> CacheFetchingScripts;
public:
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
} else {
insertedPortionsBytes += (*itPortion)->BlobsBytes();
}
auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart());
auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson());
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, start, finish));
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd()));
++itPortion;
} else {
auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified());
auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified());
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, start, finish));
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified()));
committedPortionsBytes += itCommitted->GetSize();
++itCommitted;
}
Expand Down
26 changes: 19 additions & 7 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class IDataSource {
YDB_READONLY(ui32, SourceIdx, 0);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish);
NArrow::TReplaceKey StartReplaceKey;
NArrow::TReplaceKey FinishReplaceKey;
YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
std::optional<ui32> RecordsCount;
Expand All @@ -52,6 +54,13 @@ class IDataSource {
virtual void DoAbort() = 0;
virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
public:
const NArrow::TReplaceKey& GetStartReplaceKey() const {
return StartReplaceKey;
}
const NArrow::TReplaceKey& GetFinishReplaceKey() const {
return FinishReplaceKey;
}

const TFetchedResult& GetStageResult() const {
AFL_VERIFY(!!StageResult);
return *StageResult;
Expand Down Expand Up @@ -147,16 +156,19 @@ class IDataSource {
void RegisterInterval(TFetchingInterval& interval);

IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish,
const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount
)
: SourceIdx(sourceIdx)
, Start(start)
, Finish(finish)
, Start(context->GetReadMetadata()->BuildSortedPosition(start))
, Finish(context->GetReadMetadata()->BuildSortedPosition(finish))
, StartReplaceKey(start)
, FinishReplaceKey(finish)
, Context(context)
, RecordSnapshotMax(recordSnapshotMax)
, RecordsCount(recordsCount)
{
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson());
if (Start.IsReverseSort()) {
std::swap(Start, Finish);
}
Expand Down Expand Up @@ -210,10 +222,10 @@ class TPortionDataSource: public IDataSource {
}

TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount())
, Portion(portion) {

, Portion(portion)
{
}
};

Expand Down Expand Up @@ -256,7 +268,7 @@ class TCommittedDataSource: public IDataSource {
}

TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {})
, CommittedBlob(committed) {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto

std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo);
std::set<ui32> result;
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
auto id = indexInfo.GetColumnIdOptional(i);
if (id) {
Expand Down

0 comments on commit 2a66675

Please sign in to comment.