Skip to content

Commit

Permalink
fix compaction intervals construction (#7176)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 29, 2024
1 parent bd147a9 commit 123a335
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
36 changes: 28 additions & 8 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class TSortableBatchPosition {
class TIntervalPosition {
private:
TSortableBatchPosition Position;
const bool LeftIntervalInclude;
bool LeftIntervalInclude;
public:
const TSortableBatchPosition& GetPosition() const {
return Position;
Expand Down Expand Up @@ -458,21 +458,41 @@ class TIntervalPositions {
return Positions.end();
}

void AddPosition(TSortableBatchPosition&& position, const bool includeLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includeLeftInterval);
if (Positions.size()) {
AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
}
void InsertPosition(TIntervalPosition&& intervalPosition) {
Positions.emplace_back(std::move(intervalPosition));
ui32 index = Positions.size() - 1;
while (index >= 1 && Positions[index] < Positions[index - 1]) {
std::swap(Positions[index], Positions[index - 1]);
index = index - 1;
}
}

void InsertPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval);
InsertPosition(std::move(intervalPosition));
}

void AddPosition(const TSortableBatchPosition& position, const bool includeLeftInterval) {
TIntervalPosition intervalPosition(position, includeLeftInterval);
void InsertPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(position, includePositionToLeftInterval);
InsertPosition(std::move(intervalPosition));
}

void AddPosition(TIntervalPosition&& intervalPosition) {
if (Positions.size()) {
AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
}
Positions.emplace_back(std::move(intervalPosition));
}

void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval);
AddPosition(std::move(intervalPosition));
}

void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) {
TIntervalPosition intervalPosition(position, includePositionToLeftInterval);
AddPosition(std::move(intervalPosition));
}
};

class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter

void TGeneralCompactColumnEngineChanges::AddCheckPoint(
const NArrow::NMerger::TSortableBatchPosition& position, const bool include) {
CheckPoints.AddPosition(position, include);
CheckPoints.InsertPosition(position, include);
}

std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() {
Expand Down

0 comments on commit 123a335

Please sign in to comment.