Skip to content

Commit

Permalink
improve chunks splitter performance (#7665)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Aug 13, 2024
1 parent 3734eda commit 555ebf3
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,25 +934,37 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std:
}
std::sort(positions.begin(), positions.end());
positions.erase(std::unique(positions.begin(), positions.end()), positions.end());

AFL_VERIFY(positions.size() > 1)("size", positions.size())("positions", JoinSeq(",", positions));
std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
slicedData.resize(positions.size() - 1);
{
for (auto&& i : t->columns()) {
for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]);
AFL_VERIFY(slice->num_chunks() == 1);
slicedData[idx].emplace_back(slice->chunks().front());
for (auto&& i : t->columns()) {
ui32 currentPosition = 0;
auto it = i->chunks().begin();
ui32 length = (*it)->length();
for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
auto chunk = (*it)->Slice(positions[idx] - currentPosition, positions[idx + 1] - positions[idx]);
AFL_VERIFY_DEBUG(chunk->length() == positions[idx + 1] - positions[idx])("length", chunk->length())(
"delta", positions[idx + 1] - positions[idx]);
AFL_VERIFY_DEBUG(chunk->length())("delta", positions[idx + 1] - positions[idx]);
if (positions[idx + 1] - currentPosition == length) {
if (++it != i->chunks().end()) {
length = (*it)->length();
}
currentPosition = positions[idx + 1];
}
slicedData[idx].emplace_back(chunk);
}
}
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
ui32 count = 0;
for (auto&& i : slicedData) {
AFL_VERIFY_DEBUG(i.size());
AFL_VERIFY_DEBUG(i.front()->length());
result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i));
count += result.back()->num_rows();
}
AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows());
AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows())("sd_size", slicedData.size())("columns", t->num_columns())(
"schema", t->schema()->ToString());
return result;
}

Expand Down

0 comments on commit 555ebf3

Please sign in to comment.