Skip to content

Commit

Permalink
speed up merging for sparsed columns (ydb-platform#8365)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 9, 2024
1 parent 57833ee commit 3ed7f54
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace NKikimr::NOlap::NCompaction {

void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext) {
AFL_VERIFY(!Started);
Started = true;
for (auto&& i : input) {
Expand All @@ -12,7 +12,20 @@ void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::I
AFL_VERIFY(i->GetDataType()->id() == Context.GetResultField()->type()->id())("input", i->GetDataType()->ToString())(
"result", Context.GetResultField()->ToString());
}
return DoStart(input);
return DoStart(input, mergeContext);
}

TMergingChunkContext::TMergingChunkContext(const std::shared_ptr<arrow::RecordBatch>& pkAndAddresses) {
auto columnPortionIdx = pkAndAddresses->GetColumnByName(IColumnMerger::PortionIdFieldName);
auto columnPortionRecordIdx = pkAndAddresses->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
IdxArray = static_pointer_cast<arrow::UInt16Array>(columnPortionIdx);
RecordIdxArray = static_pointer_cast<arrow::UInt32Array>(columnPortionRecordIdx);

AFL_VERIFY(pkAndAddresses->num_rows() == IdxArray->length());
AFL_VERIFY(pkAndAddresses->num_rows() == RecordIdxArray->length());
}

} // namespace NKikimr::NOlap::NCompaction
134 changes: 112 additions & 22 deletions ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,121 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>

namespace NKikimr::NOlap::NCompaction {

class TMergingChunkContext {
private:
std::shared_ptr<arrow::UInt16Array> IdxArray;
std::shared_ptr<arrow::UInt32Array> RecordIdxArray;

public:
const arrow::UInt16Array& GetIdxArray() const {
return *IdxArray;
}
const arrow::UInt32Array& GetRecordIdxArray() const {
return *RecordIdxArray;
}

TMergingChunkContext(const std::shared_ptr<arrow::RecordBatch>& pkAndAddresses);
};

class TMergingContext {
public:
class TAddress {
private:
YDB_ACCESSOR(i32, ChunkIdx, -1);
YDB_ACCESSOR(i32, GlobalPosition, -1);

public:
TAddress() = default;
bool operator<(const TAddress& item) const {
if (ChunkIdx < item.ChunkIdx) {
return true;
} else if (item.ChunkIdx < ChunkIdx) {
return false;
} else {
return GlobalPosition < item.GlobalPosition;
}
}

bool IsValid() const {
return ChunkIdx >= 0 && GlobalPosition >= 0;
}
};

private:
YDB_READONLY_DEF(std::vector<TMergingChunkContext>, Chunks);
std::vector<std::shared_ptr<NArrow::TGeneralContainer>> InputContainers;

std::optional<std::vector<std::vector<TAddress>>> RemapPortionIndexToResultIndex;

public:
const TMergingChunkContext& GetChunk(const ui32 idx) const {
AFL_VERIFY(idx < Chunks.size());
return Chunks[idx];
}

bool HasRemapInfo(const ui32 idx) {
return GetRemapPortionIndexToResultIndex(idx).size();
}

const std::vector<std::vector<TAddress>>& GetRemapPortionIndexToResultIndex() {
if (!RemapPortionIndexToResultIndex) {
std::vector<std::vector<TAddress>> result;
result.resize(InputContainers.size());
{
ui32 idx = 0;
for (auto&& p : InputContainers) {
if (p) {
result[idx].resize(p->GetRecordsCount());
}
++idx;
}
}
ui32 chunkIdx = 0;
for (auto&& i : Chunks) {
auto& pIdxArray = i.GetIdxArray();
auto& pRecordIdxArray = i.GetRecordIdxArray();
for (ui32 recordIdx = 0; recordIdx < i.GetIdxArray().length(); ++recordIdx) {
auto& sourceRemap = result[pIdxArray.Value(recordIdx)];
if (sourceRemap.size()) {
sourceRemap[pRecordIdxArray.Value(recordIdx)].SetChunkIdx(chunkIdx);
sourceRemap[pRecordIdxArray.Value(recordIdx)].SetGlobalPosition(recordIdx);
}
}
++chunkIdx;
}
RemapPortionIndexToResultIndex = std::move(result);
}
return *RemapPortionIndexToResultIndex;
}

const std::vector<TAddress>& GetRemapPortionIndexToResultIndex(const ui32 idx) {
auto& result = GetRemapPortionIndexToResultIndex();
AFL_VERIFY(idx < result.size());
return result[idx];
}

TMergingContext(const std::vector<std::shared_ptr<arrow::RecordBatch>>& pkAndAddresses,
const std::vector<std::shared_ptr<NArrow::TGeneralContainer>>& inputContainers)
: InputContainers(inputContainers)
{
for (auto&& i : pkAndAddresses) {
Chunks.emplace_back(i);
}
}
};

class IColumnMerger {
public:
using TFactory = NObjectFactory::TParametrizedObjectFactory<IColumnMerger, TString, const TColumnMergeContext&>;

private:
bool Started = false;

virtual std::vector<TColumnPortionResult> DoExecute(
const TChunkMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) = 0;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) = 0;
virtual std::vector<TColumnPortionResult> DoExecute(const TChunkMergeContext& context, TMergingContext& mergeContext) = 0;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext) = 0;

protected:
const TColumnMergeContext& Context;
Expand All @@ -25,28 +129,14 @@ class IColumnMerger {
std::make_shared<arrow::Field>(PortionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());

IColumnMerger(const TColumnMergeContext& context)
: Context(context)
{

: Context(context) {
}
virtual ~IColumnMerger() = default;

void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input);

std::vector<TColumnPortionResult> Execute(const TChunkMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {

auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName);
auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);

AFL_VERIFY(remap->num_rows() == pIdxArray.length());
AFL_VERIFY(remap->num_rows() == pRecordIdxArray.length());
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext);

return DoExecute(context, pIdxArray, pRecordIdxArray);
std::vector<TColumnPortionResult> Execute(const TChunkMergeContext& context, TMergingContext& mergeContext) {
return DoExecute(context, mergeContext);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ class TColumnMergeContext {
class TChunkMergeContext {
private:
YDB_READONLY(ui32, PortionRowsCountLimit, 10000);
YDB_READONLY(ui32, BatchIdx, 0);
YDB_READONLY(ui32, RecordsCount, 0);

public:
TChunkMergeContext(const ui32 portionRowsCountLimit)
: PortionRowsCountLimit(portionRowsCountLimit) {
Y_ABORT_UNLESS(PortionRowsCountLimit);
TChunkMergeContext(const ui32 portionRowsCountLimit, const ui32 batchIdx, const ui32 recordsCount)
: PortionRowsCountLimit(portionRowsCountLimit)
, BatchIdx(batchIdx)
, RecordsCount(recordsCount)
{
AFL_VERIFY(RecordsCount);
AFL_VERIFY(PortionRowsCountLimit);
}
};
} // namespace NKikimr::NOlap::NCompaction
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
}
}

TMergingContext mergingContext(batchResults, Batches);

for (auto&& [columnId, columnData] : columnsData) {
const TString& columnName = resultFiltered->GetIndexInfo().GetColumnName(columnId);
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", columnName));
Expand All @@ -91,16 +93,15 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
IColumnMerger::TFactory::MakeHolder(commonContext.GetLoader()->GetAccessorConstructor().GetClassName(), commonContext);
AFL_VERIFY(!!merger)("problem", "cannot create merger")(
"class_name", commonContext.GetLoader()->GetAccessorConstructor().GetClassName());
merger->Start(columnData, mergingContext);

merger->Start(columnData);
ui32 batchIdx = 0;
for (auto&& batchResult : batchResults) {
const ui32 portionRecordsCountLimit =
batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1;

TChunkMergeContext context(portionRecordsCountLimit);

chunkGroups[batchIdx][columnId] = merger->Execute(context, batchResult);
TChunkMergeContext context(portionRecordsCountLimit, batchIdx, batchResult->num_rows());
chunkGroups[batchIdx][columnId] = merger->Execute(context, mergingContext);
++batchIdx;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace NKikimr::NOlap::NCompaction {

void TPlainMerger::DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
void TPlainMerger::DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& /*mContext*/) {
for (auto&& p : input) {
if (p) {
Cursors.emplace_back(NCompaction::TPortionColumnCursor(p));
Expand All @@ -14,25 +14,25 @@ void TPlainMerger::DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::
}
}

std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TPlainMerger::DoExecute(const TChunkMergeContext& chunkContext,
const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) {
std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TPlainMerger::DoExecute(
const TChunkMergeContext& chunkContext, TMergingContext& mContext) {
NCompaction::TMergedColumn mColumn(Context, chunkContext);

auto& chunkInfo = mContext.GetChunk(chunkContext.GetBatchIdx());
std::optional<ui16> predPortionIdx;
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
const ui16 portionIdx = pIdxArray.Value(idx);
const ui32 portionRecordIdx = pRecordIdxArray.Value(idx);
for (ui32 idx = 0; idx < chunkInfo.GetIdxArray().length(); ++idx) {
const ui16 portionIdx = chunkInfo.GetIdxArray().Value(idx);
const ui32 portionRecordIdx = chunkInfo.GetRecordIdxArray().Value(idx);
auto& cursor = Cursors[portionIdx];
cursor.Next(portionRecordIdx, mColumn);
if (predPortionIdx && portionIdx != *predPortionIdx) {
Cursors[*predPortionIdx].Fetch(mColumn);
}
if (idx + 1 == pIdxArray.length()) {
if (idx + 1 == chunkInfo.GetIdxArray().length()) {
cursor.Fetch(mColumn);
}
predPortionIdx = portionIdx;
}
AFL_VERIFY(pIdxArray.length() == mColumn.GetRecordsCount());
AFL_VERIFY(chunkInfo.GetIdxArray().length() == mColumn.GetRecordsCount());
return mColumn.BuildResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ class TPlainMerger: public IColumnMerger {
static inline auto Registrator = TFactory::TRegistrator<TPlainMerger>(NArrow::NAccessor::TGlobalConst::PlainDataAccessorName);
using TBase = IColumnMerger;
std::vector<NCompaction::TPortionColumnCursor> Cursors;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) override;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input, TMergingContext& mergeContext) override;

virtual std::vector<TColumnPortionResult> DoExecute(
const TChunkMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) override;
virtual std::vector<TColumnPortionResult> DoExecute(const TChunkMergeContext& context, TMergingContext& mergeContext) override;

public:
using TBase::TBase;
Expand Down
Loading

0 comments on commit 3ed7f54

Please sign in to comment.