From 064cece558359e654fc05a6b61d9250e5d6e601a Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 1 Feb 2024 12:44:01 +0300 Subject: [PATCH 1/2] dont use predicate columns fetching and filter usage in case whole portion matched in predicates ranges --- .../columnshard/engines/predicate/filter.cpp | 9 ++++ .../tx/columnshard/engines/predicate/filter.h | 1 + .../columnshard/engines/predicate/range.cpp | 31 ++++++++++++++ .../tx/columnshard/engines/predicate/range.h | 1 + .../engines/reader/plain_reader/context.cpp | 41 ++++++++++++++----- .../engines/reader/plain_reader/context.h | 5 ++- .../reader/plain_reader/plain_read_data.cpp | 9 +--- .../engines/reader/plain_reader/source.h | 26 ++++++++---- .../engines/reader/read_metadata.cpp | 2 +- 9 files changed, 98 insertions(+), 27 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.cpp b/ydb/core/tx/columnshard/engines/predicate/filter.cpp index 77816507e0c1..10d66a832c1a 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/filter.cpp @@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf return SortedRanges.empty(); } +bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const { + for (auto&& i : SortedRanges) { + if (i.IsPortionInPartialUsage(start, end, indexInfo)) { + return true; + } + } + return false; +} + TPKRangesFilter::TPKRangesFilter(const bool reverse) : ReverseFlag(reverse) { diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h index d9448d438081..6d30adfdc227 100644 --- a/ydb/core/tx/columnshard/engines/predicate/filter.h +++ b/ydb/core/tx/columnshard/engines/predicate/filter.h @@ -38,6 +38,7 @@ class TPKRangesFilter { } bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; + bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const; NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index 53e10174b82d..a107c3c5158a 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo return true; } +bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const { + bool startUsage = false; + bool endUsage = false; + if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) { + AFL_VERIFY(from->Size() <= start.Size()); + if (PredicateFrom.IsInclude()) { + startUsage = std::is_gteq(start.ComparePartNotNull(*from, from->Size())); + } else { + startUsage = std::is_gt(start.ComparePartNotNull(*from, from->Size())); + } + } else { + startUsage = true; + } + + if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) { + AFL_VERIFY(to->Size() <= end.Size()); + if (PredicateTo.IsInclude()) { + endUsage = std::is_lteq(end.ComparePartNotNull(*to, to->Size())); + } else { + endUsage = std::is_lt(end.ComparePartNotNull(*to, to->Size())); + } + } else { + endUsage = true; + } + +// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString()) +// ("start_usage", startUsage)("end_usage", endUsage); + + return endUsage ^ startUsage; +} + std::optional TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) { if (!from.CrossRanges(to)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected"); diff --git a/ydb/core/tx/columnshard/engines/predicate/range.h b/ydb/core/tx/columnshard/engines/predicate/range.h index 7e23da4f13d6..ff84f35408a3 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.h +++ b/ydb/core/tx/columnshard/engines/predicate/range.h @@ -41,6 +41,7 @@ class TPKRangeFilter { NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const; bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const; + bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const; std::set GetColumnIds(const TIndexInfo& indexInfo) const; TString DebugString() const; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp index cf07daf49f8f..e436096cafb2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp @@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr& source, const bool exclusiveSource) const { const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax(); - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0]; + const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo()); + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0]; if (!result) { return std::make_shared(source->GetRecordsCount(), "fake"); } return result; } -std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const { +std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const { std::shared_ptr result = std::make_shared(); std::shared_ptr current = result; + const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount(); if (!!IndexChecker) { current = current->AttachNext(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); current = current->AttachNext(std::make_shared(IndexChecker)); } - if (!EFColumns->GetColumnsCount()) { + if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) { TColumnsSet columnsFetch = *FFColumns; if (needSnapshots) { columnsFetch = columnsFetch + *SpecColumns; @@ -52,6 +54,9 @@ std::shared_ptr TSpecialReadContext if (needSnapshots || FFColumns->Contains(SpecColumns)) { columnsFetch = columnsFetch + *SpecColumns; } + if (partialUsageByPredicate) { + columnsFetch = columnsFetch + *PredicateColumns; + } AFL_VERIFY(columnsFetch.GetColumnsCount()); current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch), "ef")); @@ -60,9 +65,13 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared()); columnsFetch = columnsFetch - *SpecColumns; } - current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch))); - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + if (partialUsageByPredicate) { + current = current->AttachNext(std::make_shared(PredicateColumns)); current = current->AttachNext(std::make_shared()); + columnsFetch = columnsFetch - *PredicateColumns; + } + if (columnsFetch.GetColumnsCount()) { + current = current->AttachNext(std::make_shared(std::make_shared(columnsFetch))); } for (auto&& i : ReadMetadata->GetProgram().GetSteps()) { if (!i->IsFilterOnly()) { @@ -84,7 +93,7 @@ std::shared_ptr TSpecialReadContext current = current->AttachNext(std::make_shared()); } current = current->AttachNext(std::make_shared(PKColumns)); - if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) { + if (partialUsageByPredicate) { current = current->AttachNext(std::make_shared()); } const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns; @@ -114,6 +123,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); SpecColumns = std::make_shared(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); IndexChecker = ReadMetadata->GetProgram().GetIndexChecker(); + { + auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo()); + if (predicateColumns.size()) { + PredicateColumns = std::make_shared(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema); + } else { + PredicateColumns = std::make_shared(); + } + } { auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); if (efColumns.size()) { @@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co MergeColumns = std::make_shared(*PKColumns + *SpecColumns); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString()); - CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false); - CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true); - CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false); - CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true); + CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false); + CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false); + CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false); + CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false); + CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true); + CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true); + CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true); + CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index a8b727a5385a..17d52413e39c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -15,6 +15,7 @@ class TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, SpecColumns); YDB_READONLY_DEF(std::shared_ptr, MergeColumns); YDB_READONLY_DEF(std::shared_ptr, EFColumns); + YDB_READONLY_DEF(std::shared_ptr, PredicateColumns); YDB_READONLY_DEF(std::shared_ptr, PKColumns); YDB_READONLY_DEF(std::shared_ptr, FFColumns); YDB_READONLY_DEF(std::shared_ptr, ProgramInputColumns); @@ -25,8 +26,8 @@ class TSpecialReadContext { std::shared_ptr PKFFColumns; std::shared_ptr EFPKColumns; std::shared_ptr FFMinusEFColumns; - std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const; - std::array, 2>, 2> CacheFetchingScripts; + std::shared_ptr BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const; + std::array, 2>, 2>, 2> CacheFetchingScripts; public: ui64 GetMemoryForSources(const std::map>& sources, const bool isExclusive); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp index 0720416cddb2..26b397fe2e36 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp @@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr& conte } else { insertedPortionsBytes += (*itPortion)->BlobsBytes(); } - auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart()); - auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson()); - sources.emplace_back(std::make_shared(sourceIdx++, *itPortion, SpecialReadContext, start, finish)); + sources.emplace_back(std::make_shared(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd())); ++itPortion; } else { - auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified()); - auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified()); - sources.emplace_back(std::make_shared(sourceIdx++, *itCommitted, SpecialReadContext, start, finish)); + sources.emplace_back(std::make_shared(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified())); committedPortionsBytes += itCommitted->GetSize(); ++itCommitted; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h index f339312ecfd7..3cce8e16eef7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.h @@ -26,6 +26,8 @@ class IDataSource { YDB_READONLY(ui32, SourceIdx, 0); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start); YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish); + NArrow::TReplaceKey StartReplaceKey; + NArrow::TReplaceKey FinishReplaceKey; YDB_READONLY_DEF(std::shared_ptr, Context); YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero()); std::optional RecordsCount; @@ -52,6 +54,13 @@ class IDataSource { virtual void DoAbort() = 0; virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0; public: + const NArrow::TReplaceKey& GetStartReplaceKey() const { + return StartReplaceKey; + } + const NArrow::TReplaceKey& GetFinishReplaceKey() const { + return FinishReplaceKey; + } + const TFetchedResult& GetStageResult() const { AFL_VERIFY(!!StageResult); return *StageResult; @@ -147,16 +156,19 @@ class IDataSource { void RegisterInterval(TFetchingInterval& interval); IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish, + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMax, const std::optional recordsCount ) : SourceIdx(sourceIdx) - , Start(start) - , Finish(finish) + , Start(context->GetReadMetadata()->BuildSortedPosition(start)) + , Finish(context->GetReadMetadata()->BuildSortedPosition(finish)) + , StartReplaceKey(start) + , FinishReplaceKey(finish) , Context(context) , RecordSnapshotMax(recordSnapshotMax) , RecordsCount(recordsCount) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson()); if (Start.IsReverseSort()) { std::swap(Start, Finish); } @@ -210,10 +222,10 @@ class TPortionDataSource: public IDataSource { } TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr& portion, const std::shared_ptr& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish) : TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount()) - , Portion(portion) { - + , Portion(portion) + { } }; @@ -256,7 +268,7 @@ class TCommittedDataSource: public IDataSource { } TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr& context, - const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish) + const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish) : TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {}) , CommittedBlob(committed) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 988d6553b327..e9a9e804977c 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto std::set TReadMetadata::GetEarlyFilterColumnIds() const { auto& indexInfo = ResultIndexSchema->GetIndexInfo(); - std::set result = GetPKRangesFilter().GetColumnIds(indexInfo); + std::set result; for (auto&& i : GetProgram().GetEarlyFilterColumns()) { auto id = indexInfo.GetColumnIdOptional(i); if (id) { From 7dd47e2c02132f310c69164d292b5ab1f5e4c853 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 1 Feb 2024 12:52:12 +0300 Subject: [PATCH 2/2] fix --- ydb/core/tx/columnshard/engines/predicate/range.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/predicate/range.cpp b/ydb/core/tx/columnshard/engines/predicate/range.cpp index a107c3c5158a..1025fa055ca5 100644 --- a/ydb/core/tx/columnshard/engines/predicate/range.cpp +++ b/ydb/core/tx/columnshard/engines/predicate/range.cpp @@ -65,9 +65,9 @@ bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, c if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) { AFL_VERIFY(from->Size() <= start.Size()); if (PredicateFrom.IsInclude()) { - startUsage = std::is_gteq(start.ComparePartNotNull(*from, from->Size())); + startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size())); } else { - startUsage = std::is_gt(start.ComparePartNotNull(*from, from->Size())); + startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size())); } } else { startUsage = true; @@ -76,9 +76,9 @@ bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, c if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) { AFL_VERIFY(to->Size() <= end.Size()); if (PredicateTo.IsInclude()) { - endUsage = std::is_lteq(end.ComparePartNotNull(*to, to->Size())); + endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size())); } else { - endUsage = std::is_lt(end.ComparePartNotNull(*to, to->Size())); + endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size())); } } else { endUsage = true; @@ -87,7 +87,7 @@ bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, c // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString()) // ("start_usage", startUsage)("end_usage", endUsage); - return endUsage ^ startUsage; + return endUsage || startUsage; } std::optional TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) {