From 33bdce5dc112f30dc7f2a42f199097c335381ba0 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 29 Jul 2024 17:48:17 +0300 Subject: [PATCH] asymmetric access for sorted key on find position (#7177) --- .../formats/arrow/reader/batch_iterator.h | 3 +- ydb/core/formats/arrow/reader/position.cpp | 15 +++-- ydb/core/formats/arrow/reader/position.h | 64 ++++++++++++++----- 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h index 48497a53c452..d3bb365d5706 100644 --- a/ydb/core/formats/arrow/reader/batch_iterator.h +++ b/ydb/core/formats/arrow/reader/batch_iterator.h @@ -44,7 +44,8 @@ class TBatchIterator { TBatchIterator(TRWSortableBatchPosition&& keyColumns) : ControlPointFlag(true) - , KeyColumns(std::move(keyColumns)) { + , KeyColumns(std::move(keyColumns)) + { } diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index a46caea384fd..2930d1139c28 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -22,8 +22,9 @@ std::optional TSortableBatchPosition::Fi const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) { ui64 posStart = posStartExt; ui64 posFinish = posFinishExt; + auto guard = position.CreateAsymmetricAccessGuard(); { - AFL_VERIFY(position.InitPosition(posStart)); + AFL_VERIFY(guard.InitSortingPosition(posStart)); auto cmp = position.Compare(forFound); if (cmp == std::partial_ordering::greater) { return TFoundPosition::Greater(posStart); @@ -32,7 +33,7 @@ std::optional TSortableBatchPosition::Fi } } { - AFL_VERIFY(position.InitPosition(posFinish)); + AFL_VERIFY(guard.InitSortingPosition(posFinish)); auto cmp = position.Compare(forFound); if (cmp == std::partial_ordering::less) { return TFoundPosition::Less(posFinish); @@ -41,7 +42,7 @@ std::optional TSortableBatchPosition::Fi } } while (posFinish > posStart + 1) { - Y_ABORT_UNLESS(position.InitPosition(0.5 * (posStart + posFinish))); + AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish))); const auto comparision = position.Compare(forFound); if (comparision == std::partial_ordering::less) { posStart = position.Position; @@ -51,12 +52,12 @@ std::optional TSortableBatchPosition::Fi return TFoundPosition::Equal(position.Position); } } - Y_ABORT_UNLESS(posFinish != posStart); + AFL_VERIFY(posFinish != posStart); if (greater) { - Y_ABORT_UNLESS(position.InitPosition(posFinish)); + AFL_VERIFY(guard.InitSortingPosition(posFinish)); return TFoundPosition::Greater(posFinish); } else { - Y_ABORT_UNLESS(position.InitPosition(posStart)); + AFL_VERIFY(guard.InitSortingPosition(posStart)); return TFoundPosition::Less(posStart); } } @@ -176,7 +177,7 @@ void TSortableScanData::BuildPosition(const ui64 position) { bool TSortableScanData::InitPosition(const ui64 position) { AFL_VERIFY(position < RecordsCount); if (position < FinishPosition && StartPosition <= position) { - return false; + return true; } LastInit = position; ui32 idx = 0; diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 35dd1c0d986f..21c6dabfa6a6 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -152,7 +152,7 @@ class TSortableScanData { void AppendPositionTo(const std::vector>& builders, const ui64 position, ui64* recordSize) const; - bool InitPosition(const ui64 position); + [[nodiscard]] bool InitPosition(const ui64 position); std::shared_ptr Slice(const ui64 offset, const ui64 count) const { std::vector> slicedArrays; @@ -244,9 +244,8 @@ class TSortableBatchPosition { , RecordsCount(recordsCount) , ReverseSort(reverseSort) , Sorting(sorting) - , Data(data) - { - + , Data(data) { + AFL_VERIFY(IsAvailablePosition(Position)); } TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete; @@ -315,7 +314,12 @@ class TSortableBatchPosition { } }; - static std::optional FindPosition(const std::shared_ptr& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional includedStartPosition); + [[nodiscard]] bool IsAvailablePosition(const i64 position) const { + return 0 <= position && position < RecordsCount; + } + + static std::optional FindPosition(const std::shared_ptr& batch, const TSortableBatchPosition& forFound, + const bool needGreater, const std::optional includedStartPosition); static std::optional FindPosition(TRWSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater); const TSortableScanData& GetData() const { @@ -487,7 +491,7 @@ class TIntervalPositions { void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) { TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval); AddPosition(std::move(intervalPosition)); - } + } void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) { TIntervalPosition intervalPosition(position, includePositionToLeftInterval); @@ -501,23 +505,53 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly public: using TBase::TBase; - bool NextPosition(const i64 delta) { + [[nodiscard]] bool NextPosition(const i64 delta) { return InitPosition(Position + delta); } - bool InitPosition(const i64 position) { - if (position < RecordsCount && position >= 0) { - Sorting->InitPosition(position); - if (Data) { - Data->InitPosition(position); + [[nodiscard]] bool InitPosition(const i64 position) { + if (!IsAvailablePosition(position)) { + return false; + } + AFL_VERIFY(Sorting->InitPosition(position))("pos", position)("count", RecordsCount); + if (Data) { + AFL_VERIFY(Data->InitPosition(position))("pos", position)("count", RecordsCount); + } + Position = position; + return true; + } + + class TAsymmetricPositionGuard: TNonCopyable { + private: + TRWSortableBatchPosition& Owner; + public: + TAsymmetricPositionGuard(TRWSortableBatchPosition& owner) + : Owner(owner) + { + } + + [[nodiscard]] bool InitSortingPosition(const i64 position) { + if (!Owner.IsAvailablePosition(position)) { + return false; } - Position = position; + AFL_VERIFY(Owner.Sorting->InitPosition(position)); + Owner.Position = position; return true; - } else { - return false; } + ~TAsymmetricPositionGuard() { + if (Owner.IsAvailablePosition(Owner.Position)) { + if (Owner.Data) { + AFL_VERIFY(Owner.Data->InitPosition(Owner.Position)); + } + } + } + }; + + TAsymmetricPositionGuard CreateAsymmetricAccessGuard() { + return TAsymmetricPositionGuard(*this); } + TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); // (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)