diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 15ad1dc7f651..5945788820ce 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -441,6 +441,195 @@ void RowContainer::prepareRead( HashStringAllocator::headerOf(view->data()), stream); } +int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) { + const auto rowColumn = rowColumns_[column]; + + if (isNullAt(row, rowColumn)) { + return 0; + } + + const auto typeKind = typeKinds_[column]; + if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) { + return reinterpret_cast(row + rowColumn.offset()) + ->size(); + } else { + return reinterpret_cast(row + rowColumn.offset()) + ->size(); + } +} + +int32_t RowContainer::extractVariableSizeAt( + const char* row, + column_index_t column, + char* output) { + const auto rowColumn = rowColumns_[column]; + + // 4 bytes for size + N bytes for data. + if (isNullAt(row, rowColumn)) { + memset(output, 0, 4); + return 4; + } + + const auto typeKind = typeKinds_[column]; + if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) { + const auto value = valueAt(row, rowColumn.offset()); + const auto size = value.size(); + memcpy(output, &size, 4); + + if (value.isInline() || + reinterpret_cast(value.data())[-1] + .size() >= value.size()) { + memcpy(output + 4, value.data(), size); + } else { + ByteStream stream; + HashStringAllocator::prepareRead( + HashStringAllocator::headerOf(value.data()), stream); + stream.readBytes(output + 4, size); + } + return 4 + size; + } + + const auto value = valueAt(row, rowColumn.offset()); + const auto size = value.size(); + + ByteStream stream; + prepareRead(row, rowColumn.offset(), stream); + + memcpy(output, &size, 4); + stream.readBytes(output + 4, size); + + return 4 + size; +} + +int32_t RowContainer::storeVariableSizeAt( + const char* data, + char* row, + column_index_t column) { + const auto typeKind = typeKinds_[column]; + const auto rowColumn = rowColumns_[column]; + + // First 4 bytes is the size of the data. + const auto size = *reinterpret_cast(data); + + if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) { + valueAt(row, rowColumn.offset()) = StringView(data + 4, size); + if (size > 0) { + stringAllocator_->copyMultipart(row, rowColumn.offset()); + } + } else { + if (size > 0) { + ByteStream stream(stringAllocator_.get(), false, false); + const auto position = stringAllocator_->newWrite(stream); + stream.appendStringPiece(folly::StringPiece(data + 4, size)); + stringAllocator_->finishWrite(stream, 0); + valueAt(row, rowColumn.offset()) = + std::string_view(reinterpret_cast(position.position), size); + } else { + valueAt(row, rowColumn.offset()) = std::string_view(); + } + } + + return 4 + size; +} + +void RowContainer::extractSerializedRows( + folly::Range rows, + const VectorPtr& result) { + // The format of the extracted row is: null bytes followed by keys and + // dependent columns. Fixed-width columns are serialized into fixed number of + // bytes (see typeKindSize). Variable-width columns are serialized as 4 bytes + // of size followed by that many bytes. + + const int32_t nullBytes = bits::nbytes(nullOffsets_.size()); + + // First, calculate total number of bytes needed to serialize all rows. + + size_t fixedWidthRowSize = 0; + bool hasVariableWidth = false; + for (auto i = 0; i < types_.size(); ++i) { + const auto& type = types_[i]; + if (type->isFixedWidth()) { + fixedWidthRowSize += typeKindSize(type->kind()); + } else { + hasVariableWidth = true; + } + } + + size_t totalBytes = nullBytes * rows.size() + fixedWidthRowSize * rows.size(); + if (hasVariableWidth) { + for (const char* row : rows) { + for (auto i = 0; i < types_.size(); ++i) { + const auto& type = types_[i]; + if (!type->isFixedWidth()) { + // 4 bytes for size + N bytes for data. + totalBytes += 4 + variableSizeAt(row, i); + } + } + } + } + + // Allocate sufficient buffer. + auto* flatResult = result->as>(); + flatResult->resize(rows.size()); + auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalBytes, true); + + // Write serialized data. + size_t totalWritten = 0; + for (auto i = 0; i < rows.size(); ++i) { + auto* row = rows[i]; + size_t offset = 0; + + // Copy nulls. + memcpy(rawBuffer + offset, row + rowColumns_[0].nullByte(), nullBytes); + offset += nullBytes; + + // Copy values. + for (auto j = 0; j < types_.size(); ++j) { + const auto& type = types_[j]; + if (type->isFixedWidth()) { + const auto size = typeKindSize(type->kind()); + memcpy(rawBuffer + offset, row + rowColumns_[j].offset(), size); + offset += size; + } else { + auto size = extractVariableSizeAt(row, j, rawBuffer + offset); + offset += size; + } + } + + flatResult->setNoCopy(i, StringView(rawBuffer, offset)); + rawBuffer += offset; + totalWritten += offset; + } + + VELOX_CHECK_EQ(totalWritten, totalBytes); +} + +void RowContainer::storeSerializedRow( + const FlatVector& vector, + vector_size_t index, + char* row) { + VELOX_CHECK(!vector.isNullAt(index)); + auto serialized = vector.valueAt(index); + size_t offset = 0; + + const int32_t nullBytes = bits::nbytes(nullOffsets_.size()); + memcpy(row + rowColumns_[0].nullByte(), serialized.data(), nullBytes); + offset += nullBytes; + + RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); + for (auto i = 0; i < types_.size(); ++i) { + const auto& type = types_[i]; + if (type->isFixedWidth()) { + const auto size = typeKindSize(type->kind()); + memcpy(row + rowColumns_[i].offset(), serialized.data() + offset, size); + offset += size; + } else { + const auto size = storeVariableSizeAt(serialized.data() + offset, row, i); + offset += size; + } + } +} + void RowContainer::extractString( StringView value, FlatVector* values, @@ -477,8 +666,20 @@ void RowContainer::storeComplexType( auto position = stringAllocator_->newWrite(stream); ContainerRowSerde::serialize(*decoded.base(), decoded.index(index), stream); stringAllocator_->finishWrite(stream, 0); + valueAt(row, offset) = std::string_view( reinterpret_cast(position.position), stream.size()); + + // TODO Fix ByteStream::size() API. @oerling is looking into that. + // Fix the 'size' of the std::string_view. + // stream.size() is the capacity + // stream.size() - stream.remainingSize() is the size of the data + size of + // 'next' links (8 bytes per link). + ByteStream readStream; + prepareRead(row, offset, readStream); + const auto size = readStream.size(); + valueAt(row, offset) = + std::string_view(reinterpret_cast(position.position), size); } // static @@ -546,14 +747,13 @@ void RowContainer::hashTyped( bool mix, uint64_t* result) { using T = typename KindToFlatVector::HashRowType; - auto nullByte = column.nullByte(); - auto nullMask = column.nullMask(); + auto offset = column.offset(); std::string storage; auto numRows = rows.size(); for (int32_t i = 0; i < numRows; ++i) { char* row = rows[i]; - if (nullable && isNullAt(row, nullByte, nullMask)) { + if (nullable && isNullAt(row, column)) { result[i] = mix ? bits::hashMix(result[i], BaseVector::kNullHash) : BaseVector::kNullHash; } else { @@ -649,9 +849,7 @@ void RowContainer::extractProbedFlags( bool nullResult = false; if (setNullForNullKeysRow && nullableKeys_) { for (auto c = 0; c < keyTypes_.size(); ++c) { - bool isNull = - isNullAt(rows[i], columnAt(c).nullByte(), columnAt(c).nullMask()); - if (isNull) { + if (isNullAt(rows[i], columnAt(c))) { nullResult = true; break; } @@ -806,8 +1004,8 @@ int32_t RowContainer::listPartitionRows( bits &= bits - 1; } startRow += kBatch; - // The last batch of 32 bytes may have been partly filled. If so, we could - // have skipped past end. + // The last batch of 32 bytes may have been partly filled. If so, we + // could have skipped past end. if (atEnd) { iter.rowNumber = numRows_; return numResults; diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index a78b0b3083bd..15aee6433594 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -293,6 +293,24 @@ class RowContainer { return numRows_; } + /// Copy key and dependent columns into a flat VARBINARY vector. All columns + /// of a row are copied into a single buffer. The format of that buffer is an + /// implementation detail. The data can be loaded back into the RowContainer + /// using 'storeSerializedRow'. + /// + /// Used for spilling as it is more efficient than converting from row to + /// columnar format. + void extractSerializedRows( + folly::Range rows, + const VectorPtr& result); + + /// Copies serialized row produced by 'extractSerializedRow' into the + /// container. + void storeSerializedRow( + const FlatVector& vector, + vector_size_t index, + char* row); + /// Copies the values at 'col' into 'result' (starting at 'resultOffset') /// for the 'numRows' rows pointed to by 'rows'. If a 'row' is null, sets /// corresponding row in 'result' to null. @@ -662,6 +680,10 @@ class RowContainer { return (row[nullByte] & nullMask) != 0; } + static inline bool isNullAt(const char* row, RowColumn rowColumn) { + return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0; + } + /// Creates a container to store a partition number for each row in this row /// container. This is used by parallel join build which is responsible for /// filling this. This function also marks this row container as immutable @@ -714,6 +736,27 @@ class RowContainer { return *reinterpret_cast(group + offset); } + /// Returns the size of a string or complex types value stored in the + /// specified row and column. + int32_t variableSizeAt(const char* row, column_index_t column); + + /// Copies a string or complex type value from the specified row and column + /// into provided buffer. Stored the size of the data in the first 4 bytes of + /// the buffer. If the value is null, writes zero into the first 4 bytes of + /// destination and returns. + /// @return The number of bytes written to 'destination' including the 4 bytes + /// of the size. + int32_t + extractVariableSizeAt(const char* row, column_index_t column, char* output); + + /// Copies a string or complex type value from 'data' into the specified row + /// and column. Expects first 4 bytes in 'data' to contain the size of the + /// string or complex value. + /// @return The number of bytes read from 'data': 4 bytes for size + that many + /// bytes. + int32_t + storeVariableSizeAt(const char* data, char* row, column_index_t column); + template static void extractColumnTyped( const char* FOLLY_NONNULL const* FOLLY_NONNULL rows, diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index fcb2ecc8ddc1..cfe6cb86aad3 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -18,6 +18,7 @@ #include "velox/exec/VectorHasher.h" #include "velox/exec/tests/utils/RowContainerTestBase.h" #include "velox/expression/VectorReaders.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" using namespace facebook::velox; using namespace facebook::velox::exec; @@ -257,6 +258,27 @@ class RowContainerTest : public exec::test::RowContainerTestBase { EXPECT_EQ(usage, sum); } + std::vector store( + RowContainer& rowContainer, + const RowVectorPtr& data) { + std::vector decodedVectors; + for (auto& vector : data->children()) { + decodedVectors.emplace_back(*vector); + } + + std::vector rows; + for (auto i = 0; i < data->size(); ++i) { + auto* row = rowContainer.newRow(); + rows.push_back(row); + + for (auto j = 0; j < decodedVectors.size(); ++j) { + rowContainer.store(decodedVectors[j], i, row, j); + } + } + + return rows; + } + std::vector store( RowContainer& rowContainer, DecodedVector& decodedVector, @@ -1586,3 +1608,49 @@ TEST_F(RowContainerTest, partialWriteComplexTypedRow) { rowContainer->initializeFields(row); rowContainer->eraseRows(folly::Range(&row, 1)); } + +TEST_F(RowContainerTest, extractSerializedRow) { + VectorFuzzer fuzzer( + { + .vectorSize = 100, + .nullRatio = 0.1, + }, + pool()); + + for (auto i = 0; i < 100; ++i) { + SCOPED_TRACE(fmt::format("Iteration #: {}", i)); + + auto rowType = fuzzer.randRowType(); + auto data = fuzzer.fuzzInputRow(rowType); + + SCOPED_TRACE(data->toString()); + + RowContainer rowContainer{rowType->children(), pool()}; + + auto rows = store(rowContainer, data); + + // Extract serialized rows. + auto serialized = BaseVector::create>( + VARBINARY(), data->size(), pool()); + rowContainer.extractSerializedRows( + folly::Range(rows.data(), rows.size()), serialized); + + rowContainer.clear(); + rows.clear(); + + // Load serialized rows back. + for (auto i = 0; i < data->size(); ++i) { + rows.push_back(rowContainer.newRow()); + rowContainer.storeSerializedRow(*serialized, i, rows.back()); + } + + // Extract into regular vector. + auto copy = BaseVector::create(rowType, data->size(), pool()); + for (auto i = 0; i < copy->childrenSize(); ++i) { + rowContainer.extractColumn( + rows.data(), copy->size(), i, copy->childAt(i)); + } + + assertEqualVectors(data, copy); + } +}