From cfa0654b38da186ee4095b08dc873c1edd74c5db Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Mon, 13 Jan 2025 10:18:18 -0800 Subject: [PATCH] feat: Optimize PrestoBatchVectorSerializer [3/7]: Serialize ArrayVectors (#12062) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12062 Context: This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream, rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as measured in Presto queries and other use cases). In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize function which will switch it to the new optimized writing path, therefore I will land these changes as a stack. In this diff: I provide the implementations for serializing ArrayVectors. Differential Revision: D68047629 --- .../PrestoBatchVectorSerializer.cpp | 129 ++++++++++++++++++ .../serializers/PrestoBatchVectorSerializer.h | 8 +- 2 files changed, 136 insertions(+), 1 deletion(-) diff --git a/velox/serializers/PrestoBatchVectorSerializer.cpp b/velox/serializers/PrestoBatchVectorSerializer.cpp index 1b693cb29c062..67a5d727dbe92 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.cpp +++ b/velox/serializers/PrestoBatchVectorSerializer.cpp @@ -21,6 +21,76 @@ #include "velox/serializers/VectorStream.h" namespace facebook::velox::serializer::presto::detail { +namespace { +// Populates mutableOffsets with the starting offset of each collection and the +// total size of all collections. +// +// Populates mutableSelectedRanges with the ranges to write from the children. +// +// Populates rangeIndex with the number of ranges to write from the children. +template +void computeCollectionRangesAndOffsets( + const VectorType* vector, + const folly::Range& ranges, + bool hasNulls, + int32_t* mutableOffsets, + IndexRange* mutableSelectedRanges, + size_t& rangeIndex) { + auto* rawSizes = vector->rawSizes(); + auto* rawOffsets = vector->rawOffsets(); + + // The first offset is always 0. + mutableOffsets[0] = 0; + // The index of the next offset to write in mutableOffsets. + size_t offsetsIndex = 1; + // The length all the collections in ranges seen so far. This is the offset to + // write for the next collection. + int32_t totalLength = 0; + if (hasNulls) { + for (const auto& range : ranges) { + if constexpr (std::is_same_v) { + if (range.isNull) { + std::fill_n(&mutableOffsets[offsetsIndex], range.size, totalLength); + offsetsIndex += range.size; + + continue; + } + } + + for (int32_t i = range.begin; i < range.begin + range.size; ++i) { + if (vector->isNullAt(i)) { + mutableOffsets[offsetsIndex++] = totalLength; + } else { + auto length = rawSizes[i]; + totalLength += length; + mutableOffsets[offsetsIndex++] = totalLength; + + // We only have to write anything from the children if the collection + // is non-empty. + if (length > 0) { + mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], rawSizes[i]}; + } + } + } + } + } else { + for (const auto& range : ranges) { + for (auto i = range.begin; i < range.begin + range.size; ++i) { + auto length = rawSizes[i]; + totalLength += length; + mutableOffsets[offsetsIndex++] = totalLength; + + // We only have to write anything from the children if the collection is + // non-empty. + if (length > 0) { + mutableSelectedRanges[rangeIndex++] = {rawOffsets[i], length}; + } + } + } + } +} +} // namespace + void PrestoBatchVectorSerializer::serialize( const RowVectorPtr& vector, const folly::Range& ranges, @@ -369,4 +439,63 @@ void PrestoBatchVectorSerializer::serializeRowVector( writeNullsSegment(hasNulls, vector, ranges, numRows, stream); } } + +template <> +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + +template <> +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + +template +void PrestoBatchVectorSerializer::serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream) { + const auto* arrayVector = vector->as(); + const auto numRows = rangesTotalSize(ranges); + + // Write out the header. + writeHeader(vector->type(), stream); + + const bool hasNulls = this->hasNulls(vector, ranges); + + // This is used to hold the ranges of the elements Vector to write out. + ScratchPtr selectedRangesHolder(scratch_); + IndexRange* mutableSelectedRanges = selectedRangesHolder.get(numRows); + // This is used to hold the offsets of the arrays which we write out towards + // the end. + ScratchPtr offsetsHolder(scratch_); + int32_t* mutableOffsets = offsetsHolder.get(numRows + 1); + // The number of ranges to write out from the elements Vector. This is equal + // to the number of non-empty, non-null arrays in ranges. + size_t rangesSize = 0; + computeCollectionRangesAndOffsets( + arrayVector, + ranges, + hasNulls, + mutableOffsets, + mutableSelectedRanges, + rangesSize); + + // Write out the elements. + serializeColumn( + arrayVector->elements(), + folly::Range(mutableSelectedRanges, rangesSize), + stream); + + // Write out the number of rows. + writeInt32(stream, numRows); + // Write out the offsets. + stream->write( + reinterpret_cast(mutableOffsets), (numRows + 1) * sizeof(int32_t)); + + // Write out the hasNull and isNUll flags. + writeNullsSegment(hasNulls, vector, ranges, numRows, stream); +} } // namespace facebook::velox::serializer::presto::detail diff --git a/velox/serializers/PrestoBatchVectorSerializer.h b/velox/serializers/PrestoBatchVectorSerializer.h index 488b4b1bd8a28..39a65cb8f8266 100644 --- a/velox/serializers/PrestoBatchVectorSerializer.h +++ b/velox/serializers/PrestoBatchVectorSerializer.h @@ -122,7 +122,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { serializeRowVector(vector, ranges, stream); break; case VectorEncoding::Simple::ARRAY: - // serializeArrayVector(vector, ranges, stream); + serializeArrayVector(vector, ranges, stream); break; case VectorEncoding::Simple::MAP: // serializeMapVector(vector, ranges, stream); @@ -547,6 +547,12 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { const folly::Range& ranges, BufferedOutputStream* stream); + template + void serializeArrayVector( + const VectorPtr& vector, + const folly::Range& ranges, + BufferedOutputStream* stream); + StreamArena arena_; const std::unique_ptr codec_; const PrestoVectorSerde::PrestoOptions opts_;