From 123a335a017a116356e3d7e9cf7fb1118f71294b Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 29 Jul 2024 14:33:37 +0300 Subject: [PATCH] fix compaction intervals construction (#7176) --- ydb/core/formats/arrow/reader/position.h | 36 ++++++++++++++----- .../engines/changes/general_compaction.cpp | 2 +- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 2861296af13b..35dd1c0d986f 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -406,7 +406,7 @@ class TSortableBatchPosition { class TIntervalPosition { private: TSortableBatchPosition Position; - const bool LeftIntervalInclude; + bool LeftIntervalInclude; public: const TSortableBatchPosition& GetPosition() const { return Position; @@ -458,21 +458,41 @@ class TIntervalPositions { return Positions.end(); } - void AddPosition(TSortableBatchPosition&& position, const bool includeLeftInterval) { - TIntervalPosition intervalPosition(std::move(position), includeLeftInterval); - if (Positions.size()) { - AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson()); - } + void InsertPosition(TIntervalPosition&& intervalPosition) { Positions.emplace_back(std::move(intervalPosition)); + ui32 index = Positions.size() - 1; + while (index >= 1 && Positions[index] < Positions[index - 1]) { + std::swap(Positions[index], Positions[index - 1]); + index = index - 1; + } + } + + void InsertPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) { + TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval); + InsertPosition(std::move(intervalPosition)); } - void AddPosition(const TSortableBatchPosition& position, const bool includeLeftInterval) { - TIntervalPosition intervalPosition(position, includeLeftInterval); + void InsertPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) { + TIntervalPosition intervalPosition(position, includePositionToLeftInterval); + InsertPosition(std::move(intervalPosition)); + } + + void AddPosition(TIntervalPosition&& intervalPosition) { if (Positions.size()) { AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson()); } Positions.emplace_back(std::move(intervalPosition)); } + + void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) { + TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval); + AddPosition(std::move(intervalPosition)); + } + + void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) { + TIntervalPosition intervalPosition(position, includePositionToLeftInterval); + AddPosition(std::move(intervalPosition)); + } }; class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index e78f71919009..7776306349ee 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -212,7 +212,7 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter void TGeneralCompactColumnEngineChanges::AddCheckPoint( const NArrow::NMerger::TSortableBatchPosition& position, const bool include) { - CheckPoints.AddPosition(position, include); + CheckPoints.InsertPosition(position, include); } std::shared_ptr TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() {