Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize PrestoBatchVectorSerializer [3/7]: Serialize ArrayVectors #12062

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,78 @@
#include "velox/serializers/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
namespace {
// Populates mutableOffsets with the starting offset of each collection and the
// total size of all collections.
//
// Populates mutableSelectedRanges with the ranges to write from the children.
//
// Populates numSelectedRanges with the number of ranges to write from the
// children.
template <typename VectorType, typename RangeType>
void computeCollectionRangesAndOffsets(
const VectorType* vector,
const folly::Range<const RangeType*>& ranges,
bool hasNulls,
int32_t* mutableOffsets,
IndexRange* mutableSelectedRanges,
size_t& numSelectedRanges) {
auto* rawSizes = vector->rawSizes();
auto* rawOffsets = vector->rawOffsets();

// The first offset is always 0.
mutableOffsets[0] = 0;
// The index of the next offset to write in mutableOffsets.
size_t offsetsIndex = 1;
// The length all the collections in ranges seen so far. This is the offset to
// write for the next collection.
int32_t totalLength = 0;
if (hasNulls) {
for (const auto& range : ranges) {
if constexpr (std::is_same_v<RangeType, IndexRangeWithNulls>) {
if (range.isNull) {
std::fill_n(&mutableOffsets[offsetsIndex], range.size, totalLength);
offsetsIndex += range.size;

continue;
}
}

for (int32_t i = range.begin; i < range.begin + range.size; ++i) {
if (vector->isNullAt(i)) {
mutableOffsets[offsetsIndex++] = totalLength;
} else {
auto length = rawSizes[i];
totalLength += length;
mutableOffsets[offsetsIndex++] = totalLength;

// We only have to write anything from the children if the collection
// is non-empty.
if (length > 0) {
mutableSelectedRanges[numSelectedRanges++] = {
rawOffsets[i], rawSizes[i]};
}
}
}
}
} else {
for (const auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
auto length = rawSizes[i];
totalLength += length;
mutableOffsets[offsetsIndex++] = totalLength;

// We only have to write anything from the children if the collection is
// non-empty.
if (length > 0) {
mutableSelectedRanges[numSelectedRanges++] = {rawOffsets[i], length};
}
}
}
}
}
} // namespace

void PrestoBatchVectorSerializer::serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Expand Down Expand Up @@ -442,7 +514,7 @@ void PrestoBatchVectorSerializer::serializeColumn(
serializeRowVector(stream, vector, ranges);
break;
case VectorEncoding::Simple::ARRAY:
// serializeArrayVector(stream, vector, ranges);
serializeArrayVector(stream, vector, ranges);
break;
case VectorEncoding::Simple::MAP:
// serializeMapVector(stream, vector, ranges);
Expand All @@ -454,4 +526,52 @@ void PrestoBatchVectorSerializer::serializeColumn(
VELOX_UNSUPPORTED();
}
}

template <typename RangeType>
void PrestoBatchVectorSerializer::serializeArrayVector(
BufferedOutputStream* stream,
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges) {
const auto* arrayVector = vector->as<ArrayVector>();
const auto numRows = rangesTotalSize(ranges);

// Write out the header.
writeHeader(stream, vector->type());

const bool hasNulls = this->hasNulls(vector, ranges);

// This is used to hold the ranges of the elements Vector to write out.
ScratchPtr<IndexRange, 64> selectedRangesHolder(scratch_);
IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows);
// This is used to hold the offsets of the arrays which we write out towards
// the end.
ScratchPtr<int32_t, 64> offsetsHolder(scratch_);
int32_t* mutableOffsets = offsetsHolder.get(numRows + 1);
// The number of ranges to write out from the elements Vector. This is equal
// to the number of non-empty, non-null arrays in ranges.
size_t numSelectedRanges = 0;
computeCollectionRangesAndOffsets(
arrayVector,
ranges,
hasNulls,
mutableOffsets,
mutableSelectedRanges,
numSelectedRanges);

// Write out the elements.
serializeColumn(
stream,
arrayVector->elements(),
folly::Range<const IndexRange*>(
mutableSelectedRanges, numSelectedRanges));

// Write out the number of rows.
writeInt32(stream, numRows);
// Write out the offsets.
stream->write(
reinterpret_cast<char*>(mutableOffsets), (numRows + 1) * sizeof(int32_t));

// Write out the hasNull and isNUll flags.
writeNullsSegment(stream, hasNulls, vector, ranges, numRows);
}
} // namespace facebook::velox::serializer::presto::detail
6 changes: 6 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges);

template <typename RangeType>
void serializeArrayVector(
BufferedOutputStream* stream,
const VectorPtr& vector,
const folly::Range<const RangeType*>& ranges);

const std::unique_ptr<folly::io::Codec> codec_;
const PrestoVectorSerde::PrestoOptions opts_;
StreamArena arena_;
Expand Down
Loading