Skip to content

Commit

Permalink
speed up merging with correct pointers operation (ydb-platform#7230)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 9, 2024
1 parent 49e39b3 commit ade28b5
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 23 deletions.
9 changes: 6 additions & 3 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

namespace NKikimr::NArrow::NMerger {

void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy) {
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);

SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
SortHeap.Push(TBatchIterator(point.BuildRWPosition(false, deepCopy)));
}

void TMergePartialStream::RemoveControlPoint() {
Expand Down Expand Up @@ -65,7 +65,7 @@ bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, cons
}

bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
PutControlPoint(readTo);
PutControlPoint(readTo, false);
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
}

Expand Down Expand Up @@ -191,6 +191,9 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
for (auto&& i : positions) {
TRecordBatchBuilder indexesBuilder(resultFields);
if (SortHeap.Empty() || i.GetPosition().Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::less) {
continue;
}
DrainCurrentTo(indexesBuilder, i.GetPosition(), i.IsIncludedToLeftInterval());
result.emplace_back(indexesBuilder.Finalize());
if (result.back()->num_rows() == 0) {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/reader/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class TMergePartialStream {
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);

void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
std::optional<TCursor>* lastResultPosition = nullptr);

public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
: SortSchema(sortSchema)
Expand All @@ -49,6 +52,7 @@ class TMergePartialStream {
Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields());
}

void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy);
void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include);

void SetPossibleSameVersion(const bool value) {
Expand All @@ -67,8 +71,6 @@ class TMergePartialStream {
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
}

void PutControlPoint(const TSortableBatchPosition& point);

void RemoveControlPoint();

bool ControlPointEnriched() const {
Expand All @@ -92,7 +94,6 @@ class TMergePartialStream {

void DrainAll(TRecordBatchBuilder& builder);
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/reader/position.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
return FindPosition(position, posStart, posFinish, forFound, greater);
}

NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition() const {
return TRWSortableBatchPosition(
Position, RecordsCount, ReverseSort, Sorting->BuildCopy(Position), Data ? Data->BuildCopy(Position) : nullptr);
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort,
deepCopy ? Sorting->BuildCopy(Position) : Sorting,
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
}

NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TSortableScanData {
}

std::shared_ptr<TSortableScanData> BuildCopy(const ui64 position) const {
return std::make_shared<TSortableScanData>(position, RecordsCount, Columns, Fields);
return std::make_shared<TSortableScanData>(*this);
}

TCursor BuildCursor(const ui64 position) const {
Expand Down Expand Up @@ -209,6 +209,17 @@ class TSortableBatchPosition {
bool ReverseSort = false;
std::shared_ptr<TSortableScanData> Sorting;
std::shared_ptr<TSortableScanData> Data;

TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting,
const std::shared_ptr<TSortableScanData>& data)
: Position(position)
, RecordsCount(recordsCount)
, ReverseSort(reverseSort)
, Sorting(sorting)
, Data(data) {
AFL_VERIFY(IsAvailablePosition(Position));
}

public:
TSortableBatchPosition() = default;

Expand All @@ -220,7 +231,7 @@ class TSortableBatchPosition {
return RecordsCount;
}

std::shared_ptr<TSortableScanData> GetSorting() const {
const std::shared_ptr<TSortableScanData>& GetSorting() const {
return Sorting;
}

Expand All @@ -239,15 +250,6 @@ class TSortableBatchPosition {
return Sorting->GetFields();
}

TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting, const std::shared_ptr<TSortableScanData>& data)
: Position(position)
, RecordsCount(recordsCount)
, ReverseSort(reverseSort)
, Sorting(sorting)
, Data(data) {
AFL_VERIFY(IsAvailablePosition(Position));
}

TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition(TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition(TRWSortableBatchPosition&& source) = delete;
Expand All @@ -256,7 +258,7 @@ class TSortableBatchPosition {
TSortableBatchPosition operator= (TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition operator= (TRWSortableBatchPosition&& source) = delete;

TRWSortableBatchPosition BuildRWPosition() const;
TRWSortableBatchPosition BuildRWPosition(const bool needData, const bool deepCopy) const;

std::shared_ptr<arrow::Table> SliceData(const ui64 offset, const ui64 count) const {
AFL_VERIFY(Data);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/reader/result_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ std::shared_ptr<arrow::RecordBatch> TRecordBatchBuilder::Finalize() {
for (auto&& i : Builders) {
columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish()));
}
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), std::move(columns));
#ifndef NDEBUG
NArrow::TStatusValidator::Validate(result->ValidateFull());
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
return TConclusionStatus::Success();
}
}
Merger->PutControlPoint(MergingContext->GetFinish());
Merger->PutControlPoint(MergingContext->GetFinish(), false);
Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
const ui32 originalSourcesCount = Sources.size();
Sources.clear();
Expand Down

0 comments on commit ade28b5

Please sign in to comment.