Skip to content

Commit

Permalink
asymmetric access for sorted key on find position (#7177)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 29, 2024
1 parent 13b5991 commit 33bdce5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
3 changes: 2 additions & 1 deletion ydb/core/formats/arrow/reader/batch_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class TBatchIterator {

TBatchIterator(TRWSortableBatchPosition&& keyColumns)
: ControlPointFlag(true)
, KeyColumns(std::move(keyColumns)) {
, KeyColumns(std::move(keyColumns))
{

}

Expand Down
15 changes: 8 additions & 7 deletions ydb/core/formats/arrow/reader/position.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) {
ui64 posStart = posStartExt;
ui64 posFinish = posFinishExt;
auto guard = position.CreateAsymmetricAccessGuard();
{
AFL_VERIFY(position.InitPosition(posStart));
AFL_VERIFY(guard.InitSortingPosition(posStart));
auto cmp = position.Compare(forFound);
if (cmp == std::partial_ordering::greater) {
return TFoundPosition::Greater(posStart);
Expand All @@ -32,7 +33,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
}
}
{
AFL_VERIFY(position.InitPosition(posFinish));
AFL_VERIFY(guard.InitSortingPosition(posFinish));
auto cmp = position.Compare(forFound);
if (cmp == std::partial_ordering::less) {
return TFoundPosition::Less(posFinish);
Expand All @@ -41,7 +42,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
}
}
while (posFinish > posStart + 1) {
Y_ABORT_UNLESS(position.InitPosition(0.5 * (posStart + posFinish)));
AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish)));
const auto comparision = position.Compare(forFound);
if (comparision == std::partial_ordering::less) {
posStart = position.Position;
Expand All @@ -51,12 +52,12 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
return TFoundPosition::Equal(position.Position);
}
}
Y_ABORT_UNLESS(posFinish != posStart);
AFL_VERIFY(posFinish != posStart);
if (greater) {
Y_ABORT_UNLESS(position.InitPosition(posFinish));
AFL_VERIFY(guard.InitSortingPosition(posFinish));
return TFoundPosition::Greater(posFinish);
} else {
Y_ABORT_UNLESS(position.InitPosition(posStart));
AFL_VERIFY(guard.InitSortingPosition(posStart));
return TFoundPosition::Less(posStart);
}
}
Expand Down Expand Up @@ -176,7 +177,7 @@ void TSortableScanData::BuildPosition(const ui64 position) {
bool TSortableScanData::InitPosition(const ui64 position) {
AFL_VERIFY(position < RecordsCount);
if (position < FinishPosition && StartPosition <= position) {
return false;
return true;
}
LastInit = position;
ui32 idx = 0;
Expand Down
64 changes: 49 additions & 15 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class TSortableScanData {

void AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const ui64 position, ui64* recordSize) const;

bool InitPosition(const ui64 position);
[[nodiscard]] bool InitPosition(const ui64 position);

std::shared_ptr<arrow::Table> Slice(const ui64 offset, const ui64 count) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> slicedArrays;
Expand Down Expand Up @@ -244,9 +244,8 @@ class TSortableBatchPosition {
, RecordsCount(recordsCount)
, ReverseSort(reverseSort)
, Sorting(sorting)
, Data(data)
{

, Data(data) {
AFL_VERIFY(IsAvailablePosition(Position));
}

TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete;
Expand Down Expand Up @@ -315,7 +314,12 @@ class TSortableBatchPosition {
}
};

static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
[[nodiscard]] bool IsAvailablePosition(const i64 position) const {
return 0 <= position && position < RecordsCount;
}

static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
const bool needGreater, const std::optional<ui32> includedStartPosition);
static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TRWSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater);

const TSortableScanData& GetData() const {
Expand Down Expand Up @@ -487,7 +491,7 @@ class TIntervalPositions {
void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval);
AddPosition(std::move(intervalPosition));
}
}

void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(position, includePositionToLeftInterval);
Expand All @@ -501,23 +505,53 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
public:
using TBase::TBase;

bool NextPosition(const i64 delta) {
[[nodiscard]] bool NextPosition(const i64 delta) {
return InitPosition(Position + delta);
}

bool InitPosition(const i64 position) {
if (position < RecordsCount && position >= 0) {
Sorting->InitPosition(position);
if (Data) {
Data->InitPosition(position);
[[nodiscard]] bool InitPosition(const i64 position) {
if (!IsAvailablePosition(position)) {
return false;
}
AFL_VERIFY(Sorting->InitPosition(position))("pos", position)("count", RecordsCount);
if (Data) {
AFL_VERIFY(Data->InitPosition(position))("pos", position)("count", RecordsCount);
}
Position = position;
return true;
}

class TAsymmetricPositionGuard: TNonCopyable {
private:
TRWSortableBatchPosition& Owner;
public:
TAsymmetricPositionGuard(TRWSortableBatchPosition& owner)
: Owner(owner)
{
}

[[nodiscard]] bool InitSortingPosition(const i64 position) {
if (!Owner.IsAvailablePosition(position)) {
return false;
}
Position = position;
AFL_VERIFY(Owner.Sorting->InitPosition(position));
Owner.Position = position;
return true;
} else {
return false;
}

~TAsymmetricPositionGuard() {
if (Owner.IsAvailablePosition(Owner.Position)) {
if (Owner.Data) {
AFL_VERIFY(Owner.Data->InitPosition(Owner.Position));
}
}
}
};

TAsymmetricPositionGuard CreateAsymmetricAccessGuard() {
return TAsymmetricPositionGuard(*this);
}

TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);

// (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)
Expand Down

0 comments on commit 33bdce5

Please sign in to comment.