Skip to content

Commit

Permalink
Add RowContainer::extractSerializedRows and storeSerializedRow APIs (#…
Browse files Browse the repository at this point in the history
…7519)

Summary:
Add APIs to RowContainer to extract rows in serialized format. Will be used in
spilling, initially in spilling of aggregation over sorted inputs.

Part of #7455

Pull Request resolved: #7519

Reviewed By: xiaoxmeng

Differential Revision: D51213589

Pulled By: mbasmanova

fbshipit-source-id: 6b0d5fc03b7bb301ae229af509143d2a1c14ab55
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Nov 13, 2023
1 parent b417572 commit 2578952
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 8 deletions.
214 changes: 206 additions & 8 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,195 @@ void RowContainer::prepareRead(
HashStringAllocator::headerOf(view->data()), stream);
}

int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) {
const auto rowColumn = rowColumns_[column];

if (isNullAt(row, rowColumn)) {
return 0;
}

const auto typeKind = typeKinds_[column];
if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
return reinterpret_cast<const StringView*>(row + rowColumn.offset())
->size();
} else {
return reinterpret_cast<const std::string_view*>(row + rowColumn.offset())
->size();
}
}

int32_t RowContainer::extractVariableSizeAt(
const char* row,
column_index_t column,
char* output) {
const auto rowColumn = rowColumns_[column];

// 4 bytes for size + N bytes for data.
if (isNullAt(row, rowColumn)) {
memset(output, 0, 4);
return 4;
}

const auto typeKind = typeKinds_[column];
if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
const auto value = valueAt<StringView>(row, rowColumn.offset());
const auto size = value.size();
memcpy(output, &size, 4);

if (value.isInline() ||
reinterpret_cast<const HashStringAllocator::Header*>(value.data())[-1]
.size() >= value.size()) {
memcpy(output + 4, value.data(), size);
} else {
ByteStream stream;
HashStringAllocator::prepareRead(
HashStringAllocator::headerOf(value.data()), stream);
stream.readBytes(output + 4, size);
}
return 4 + size;
}

const auto value = valueAt<std::string_view>(row, rowColumn.offset());
const auto size = value.size();

ByteStream stream;
prepareRead(row, rowColumn.offset(), stream);

memcpy(output, &size, 4);
stream.readBytes(output + 4, size);

return 4 + size;
}

int32_t RowContainer::storeVariableSizeAt(
const char* data,
char* row,
column_index_t column) {
const auto typeKind = typeKinds_[column];
const auto rowColumn = rowColumns_[column];

// First 4 bytes is the size of the data.
const auto size = *reinterpret_cast<const int32_t*>(data);

if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
valueAt<StringView>(row, rowColumn.offset()) = StringView(data + 4, size);
if (size > 0) {
stringAllocator_->copyMultipart(row, rowColumn.offset());
}
} else {
if (size > 0) {
ByteStream stream(stringAllocator_.get(), false, false);
const auto position = stringAllocator_->newWrite(stream);
stream.appendStringPiece(folly::StringPiece(data + 4, size));
stringAllocator_->finishWrite(stream, 0);
valueAt<std::string_view>(row, rowColumn.offset()) =
std::string_view(reinterpret_cast<char*>(position.position), size);
} else {
valueAt<std::string_view>(row, rowColumn.offset()) = std::string_view();
}
}

return 4 + size;
}

void RowContainer::extractSerializedRows(
folly::Range<char**> rows,
const VectorPtr& result) {
// The format of the extracted row is: null bytes followed by keys and
// dependent columns. Fixed-width columns are serialized into fixed number of
// bytes (see typeKindSize). Variable-width columns are serialized as 4 bytes
// of size followed by that many bytes.

const int32_t nullBytes = bits::nbytes(nullOffsets_.size());

// First, calculate total number of bytes needed to serialize all rows.

size_t fixedWidthRowSize = 0;
bool hasVariableWidth = false;
for (auto i = 0; i < types_.size(); ++i) {
const auto& type = types_[i];
if (type->isFixedWidth()) {
fixedWidthRowSize += typeKindSize(type->kind());
} else {
hasVariableWidth = true;
}
}

size_t totalBytes = nullBytes * rows.size() + fixedWidthRowSize * rows.size();
if (hasVariableWidth) {
for (const char* row : rows) {
for (auto i = 0; i < types_.size(); ++i) {
const auto& type = types_[i];
if (!type->isFixedWidth()) {
// 4 bytes for size + N bytes for data.
totalBytes += 4 + variableSizeAt(row, i);
}
}
}
}

// Allocate sufficient buffer.
auto* flatResult = result->as<FlatVector<StringView>>();
flatResult->resize(rows.size());
auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalBytes, true);

// Write serialized data.
size_t totalWritten = 0;
for (auto i = 0; i < rows.size(); ++i) {
auto* row = rows[i];
size_t offset = 0;

// Copy nulls.
memcpy(rawBuffer + offset, row + rowColumns_[0].nullByte(), nullBytes);
offset += nullBytes;

// Copy values.
for (auto j = 0; j < types_.size(); ++j) {
const auto& type = types_[j];
if (type->isFixedWidth()) {
const auto size = typeKindSize(type->kind());
memcpy(rawBuffer + offset, row + rowColumns_[j].offset(), size);
offset += size;
} else {
auto size = extractVariableSizeAt(row, j, rawBuffer + offset);
offset += size;
}
}

flatResult->setNoCopy(i, StringView(rawBuffer, offset));
rawBuffer += offset;
totalWritten += offset;
}

VELOX_CHECK_EQ(totalWritten, totalBytes);
}

void RowContainer::storeSerializedRow(
const FlatVector<StringView>& vector,
vector_size_t index,
char* row) {
VELOX_CHECK(!vector.isNullAt(index));
auto serialized = vector.valueAt(index);
size_t offset = 0;

const int32_t nullBytes = bits::nbytes(nullOffsets_.size());
memcpy(row + rowColumns_[0].nullByte(), serialized.data(), nullBytes);
offset += nullBytes;

RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_);
for (auto i = 0; i < types_.size(); ++i) {
const auto& type = types_[i];
if (type->isFixedWidth()) {
const auto size = typeKindSize(type->kind());
memcpy(row + rowColumns_[i].offset(), serialized.data() + offset, size);
offset += size;
} else {
const auto size = storeVariableSizeAt(serialized.data() + offset, row, i);
offset += size;
}
}
}

void RowContainer::extractString(
StringView value,
FlatVector<StringView>* values,
Expand Down Expand Up @@ -477,8 +666,20 @@ void RowContainer::storeComplexType(
auto position = stringAllocator_->newWrite(stream);
ContainerRowSerde::serialize(*decoded.base(), decoded.index(index), stream);
stringAllocator_->finishWrite(stream, 0);

valueAt<std::string_view>(row, offset) = std::string_view(
reinterpret_cast<char*>(position.position), stream.size());

// TODO Fix ByteStream::size() API. @oerling is looking into that.
// Fix the 'size' of the std::string_view.
// stream.size() is the capacity
// stream.size() - stream.remainingSize() is the size of the data + size of
// 'next' links (8 bytes per link).
ByteStream readStream;
prepareRead(row, offset, readStream);
const auto size = readStream.size();
valueAt<std::string_view>(row, offset) =
std::string_view(reinterpret_cast<char*>(position.position), size);
}

// static
Expand Down Expand Up @@ -546,14 +747,13 @@ void RowContainer::hashTyped(
bool mix,
uint64_t* result) {
using T = typename KindToFlatVector<Kind>::HashRowType;
auto nullByte = column.nullByte();
auto nullMask = column.nullMask();

auto offset = column.offset();
std::string storage;
auto numRows = rows.size();
for (int32_t i = 0; i < numRows; ++i) {
char* row = rows[i];
if (nullable && isNullAt(row, nullByte, nullMask)) {
if (nullable && isNullAt(row, column)) {
result[i] = mix ? bits::hashMix(result[i], BaseVector::kNullHash)
: BaseVector::kNullHash;
} else {
Expand Down Expand Up @@ -649,9 +849,7 @@ void RowContainer::extractProbedFlags(
bool nullResult = false;
if (setNullForNullKeysRow && nullableKeys_) {
for (auto c = 0; c < keyTypes_.size(); ++c) {
bool isNull =
isNullAt(rows[i], columnAt(c).nullByte(), columnAt(c).nullMask());
if (isNull) {
if (isNullAt(rows[i], columnAt(c))) {
nullResult = true;
break;
}
Expand Down Expand Up @@ -806,8 +1004,8 @@ int32_t RowContainer::listPartitionRows(
bits &= bits - 1;
}
startRow += kBatch;
// The last batch of 32 bytes may have been partly filled. If so, we could
// have skipped past end.
// The last batch of 32 bytes may have been partly filled. If so, we
// could have skipped past end.
if (atEnd) {
iter.rowNumber = numRows_;
return numResults;
Expand Down
43 changes: 43 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,24 @@ class RowContainer {
return numRows_;
}

/// Copy key and dependent columns into a flat VARBINARY vector. All columns
/// of a row are copied into a single buffer. The format of that buffer is an
/// implementation detail. The data can be loaded back into the RowContainer
/// using 'storeSerializedRow'.
///
/// Used for spilling as it is more efficient than converting from row to
/// columnar format.
void extractSerializedRows(
folly::Range<char**> rows,
const VectorPtr& result);

/// Copies serialized row produced by 'extractSerializedRow' into the
/// container.
void storeSerializedRow(
const FlatVector<StringView>& vector,
vector_size_t index,
char* row);

/// Copies the values at 'col' into 'result' (starting at 'resultOffset')
/// for the 'numRows' rows pointed to by 'rows'. If a 'row' is null, sets
/// corresponding row in 'result' to null.
Expand Down Expand Up @@ -662,6 +680,10 @@ class RowContainer {
return (row[nullByte] & nullMask) != 0;
}

static inline bool isNullAt(const char* row, RowColumn rowColumn) {
return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0;
}

/// Creates a container to store a partition number for each row in this row
/// container. This is used by parallel join build which is responsible for
/// filling this. This function also marks this row container as immutable
Expand Down Expand Up @@ -714,6 +736,27 @@ class RowContainer {
return *reinterpret_cast<T*>(group + offset);
}

/// Returns the size of a string or complex types value stored in the
/// specified row and column.
int32_t variableSizeAt(const char* row, column_index_t column);

/// Copies a string or complex type value from the specified row and column
/// into provided buffer. Stored the size of the data in the first 4 bytes of
/// the buffer. If the value is null, writes zero into the first 4 bytes of
/// destination and returns.
/// @return The number of bytes written to 'destination' including the 4 bytes
/// of the size.
int32_t
extractVariableSizeAt(const char* row, column_index_t column, char* output);

/// Copies a string or complex type value from 'data' into the specified row
/// and column. Expects first 4 bytes in 'data' to contain the size of the
/// string or complex value.
/// @return The number of bytes read from 'data': 4 bytes for size + that many
/// bytes.
int32_t
storeVariableSizeAt(const char* data, char* row, column_index_t column);

template <TypeKind Kind>
static void extractColumnTyped(
const char* FOLLY_NONNULL const* FOLLY_NONNULL rows,
Expand Down
Loading

0 comments on commit 2578952

Please sign in to comment.