Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RowContainer::extractSerializedRows and storeSerializedRow APIs #7519

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 206 additions & 8 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,195 @@ void RowContainer::prepareRead(
HashStringAllocator::headerOf(view->data()), stream);
}

int32_t RowContainer::variableSizeAt(const char* row, column_index_t column) {
const auto rowColumn = rowColumns_[column];

if (isNullAt(row, rowColumn)) {
return 0;
}

const auto typeKind = typeKinds_[column];
if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
return reinterpret_cast<const StringView*>(row + rowColumn.offset())
->size();
} else {
return reinterpret_cast<const std::string_view*>(row + rowColumn.offset())
->size();
}
}

int32_t RowContainer::extractVariableSizeAt(
const char* row,
column_index_t column,
char* output) {
const auto rowColumn = rowColumns_[column];

// 4 bytes for size + N bytes for data.
if (isNullAt(row, rowColumn)) {
memset(output, 0, 4);
return 4;
}

const auto typeKind = typeKinds_[column];
if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
const auto value = valueAt<StringView>(row, rowColumn.offset());
const auto size = value.size();
memcpy(output, &size, 4);

if (value.isInline() ||
reinterpret_cast<const HashStringAllocator::Header*>(value.data())[-1]
.size() >= value.size()) {
memcpy(output + 4, value.data(), size);
} else {
ByteStream stream;
HashStringAllocator::prepareRead(
HashStringAllocator::headerOf(value.data()), stream);
stream.readBytes(output + 4, size);
}
return 4 + size;
}

const auto value = valueAt<std::string_view>(row, rowColumn.offset());
const auto size = value.size();

ByteStream stream;
prepareRead(row, rowColumn.offset(), stream);

memcpy(output, &size, 4);
stream.readBytes(output + 4, size);

return 4 + size;
}

int32_t RowContainer::storeVariableSizeAt(
const char* data,
char* row,
column_index_t column) {
const auto typeKind = typeKinds_[column];
const auto rowColumn = rowColumns_[column];

// First 4 bytes is the size of the data.
const auto size = *reinterpret_cast<const int32_t*>(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we assume we always do serialization/deserialization from the same machine. So there is no byte endian issue? thanks!


if (typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
valueAt<StringView>(row, rowColumn.offset()) = StringView(data + 4, size);
if (size > 0) {
stringAllocator_->copyMultipart(row, rowColumn.offset());
}
} else {
if (size > 0) {
ByteStream stream(stringAllocator_.get(), false, false);
const auto position = stringAllocator_->newWrite(stream);
stream.appendStringPiece(folly::StringPiece(data + 4, size));
stringAllocator_->finishWrite(stream, 0);
valueAt<std::string_view>(row, rowColumn.offset()) =
std::string_view(reinterpret_cast<char*>(position.position), size);
} else {
valueAt<std::string_view>(row, rowColumn.offset()) = std::string_view();
}
}

return 4 + size;
}

void RowContainer::extractSerializedRows(
folly::Range<char**> rows,
const VectorPtr& result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we drop const as result is changed inside this function?

// The format of the extracted row is: null bytes followed by keys and
// dependent columns. Fixed-width columns are serialized into fixed number of
// bytes (see typeKindSize). Variable-width columns are serialized as 4 bytes
// of size followed by that many bytes.

const int32_t nullBytes = bits::nbytes(nullOffsets_.size());

// First, calculate total number of bytes needed to serialize all rows.

size_t fixedWidthRowSize = 0;
bool hasVariableWidth = false;
for (auto i = 0; i < types_.size(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this into a common utility? Having seen similar function in some other code base.

const auto& type = types_[i];
if (type->isFixedWidth()) {
fixedWidthRowSize += typeKindSize(type->kind());
} else {
hasVariableWidth = true;
}
}

size_t totalBytes = nullBytes * rows.size() + fixedWidthRowSize * rows.size();
if (hasVariableWidth) {
for (const char* row : rows) {
for (auto i = 0; i < types_.size(); ++i) {
const auto& type = types_[i];
if (!type->isFixedWidth()) {
// 4 bytes for size + N bytes for data.
totalBytes += 4 + variableSizeAt(row, i);
}
}
}
}

// Allocate sufficient buffer.
auto* flatResult = result->as<FlatVector<StringView>>();
flatResult->resize(rows.size());
auto* rawBuffer = flatResult->getRawStringBufferWithSpace(totalBytes, true);

// Write serialized data.
size_t totalWritten = 0;
for (auto i = 0; i < rows.size(); ++i) {
auto* row = rows[i];
size_t offset = 0;

// Copy nulls.
memcpy(rawBuffer + offset, row + rowColumns_[0].nullByte(), nullBytes);
offset += nullBytes;

// Copy values.
for (auto j = 0; j < types_.size(); ++j) {
const auto& type = types_[j];
if (type->isFixedWidth()) {
const auto size = typeKindSize(type->kind());
memcpy(rawBuffer + offset, row + rowColumns_[j].offset(), size);
offset += size;
} else {
auto size = extractVariableSizeAt(row, j, rawBuffer + offset);
offset += size;
}
}

flatResult->setNoCopy(i, StringView(rawBuffer, offset));
rawBuffer += offset;
totalWritten += offset;
}

VELOX_CHECK_EQ(totalWritten, totalBytes);
}

void RowContainer::storeSerializedRow(
const FlatVector<StringView>& vector,
vector_size_t index,
char* row) {
VELOX_CHECK(!vector.isNullAt(index));
auto serialized = vector.valueAt(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const auto serialized

size_t offset = 0;

const int32_t nullBytes = bits::nbytes(nullOffsets_.size());
memcpy(row + rowColumns_[0].nullByte(), serialized.data(), nullBytes);
offset += nullBytes;

RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_);
for (auto i = 0; i < types_.size(); ++i) {
const auto& type = types_[i];
if (type->isFixedWidth()) {
const auto size = typeKindSize(type->kind());
memcpy(row + rowColumns_[i].offset(), serialized.data() + offset, size);
offset += size;
} else {
const auto size = storeVariableSizeAt(serialized.data() + offset, row, i);
offset += size;
}
}
}

void RowContainer::extractString(
StringView value,
FlatVector<StringView>* values,
Expand Down Expand Up @@ -477,8 +666,20 @@ void RowContainer::storeComplexType(
auto position = stringAllocator_->newWrite(stream);
ContainerRowSerde::serialize(*decoded.base(), decoded.index(index), stream);
stringAllocator_->finishWrite(stream, 0);

valueAt<std::string_view>(row, offset) = std::string_view(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

reinterpret_cast<char*>(position.position), stream.size());

// TODO Fix ByteStream::size() API. @oerling is looking into that.
// Fix the 'size' of the std::string_view.
// stream.size() is the capacity
// stream.size() - stream.remainingSize() is the size of the data + size of
// 'next' links (8 bytes per link).
ByteStream readStream;
prepareRead(row, offset, readStream);
const auto size = readStream.size();
valueAt<std::string_view>(row, offset) =
std::string_view(reinterpret_cast<char*>(position.position), size);
}

// static
Expand Down Expand Up @@ -546,14 +747,13 @@ void RowContainer::hashTyped(
bool mix,
uint64_t* result) {
using T = typename KindToFlatVector<Kind>::HashRowType;
auto nullByte = column.nullByte();
auto nullMask = column.nullMask();

auto offset = column.offset();
std::string storage;
auto numRows = rows.size();
for (int32_t i = 0; i < numRows; ++i) {
char* row = rows[i];
if (nullable && isNullAt(row, nullByte, nullMask)) {
if (nullable && isNullAt(row, column)) {
result[i] = mix ? bits::hashMix(result[i], BaseVector::kNullHash)
: BaseVector::kNullHash;
} else {
Expand Down Expand Up @@ -649,9 +849,7 @@ void RowContainer::extractProbedFlags(
bool nullResult = false;
if (setNullForNullKeysRow && nullableKeys_) {
for (auto c = 0; c < keyTypes_.size(); ++c) {
bool isNull =
isNullAt(rows[i], columnAt(c).nullByte(), columnAt(c).nullMask());
if (isNull) {
if (isNullAt(rows[i], columnAt(c))) {
nullResult = true;
break;
}
Expand Down Expand Up @@ -806,8 +1004,8 @@ int32_t RowContainer::listPartitionRows(
bits &= bits - 1;
}
startRow += kBatch;
// The last batch of 32 bytes may have been partly filled. If so, we could
// have skipped past end.
// The last batch of 32 bytes may have been partly filled. If so, we
// could have skipped past end.
if (atEnd) {
iter.rowNumber = numRows_;
return numResults;
Expand Down
43 changes: 43 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,24 @@ class RowContainer {
return numRows_;
}

/// Copy key and dependent columns into a flat VARBINARY vector. All columns
/// of a row are copied into a single buffer. The format of that buffer is an
/// implementation detail. The data can be loaded back into the RowContainer
/// using 'storeSerializedRow'.
///
/// Used for spilling as it is more efficient than converting from row to
/// columnar format.
void extractSerializedRows(
folly::Range<char**> rows,
const VectorPtr& result);

/// Copies serialized row produced by 'extractSerializedRow' into the
/// container.
void storeSerializedRow(
const FlatVector<StringView>& vector,
vector_size_t index,
char* row);

/// Copies the values at 'col' into 'result' (starting at 'resultOffset')
/// for the 'numRows' rows pointed to by 'rows'. If a 'row' is null, sets
/// corresponding row in 'result' to null.
Expand Down Expand Up @@ -662,6 +680,10 @@ class RowContainer {
return (row[nullByte] & nullMask) != 0;
}

static inline bool isNullAt(const char* row, RowColumn rowColumn) {
return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0;
}

/// Creates a container to store a partition number for each row in this row
/// container. This is used by parallel join build which is responsible for
/// filling this. This function also marks this row container as immutable
Expand Down Expand Up @@ -714,6 +736,27 @@ class RowContainer {
return *reinterpret_cast<T*>(group + offset);
}

/// Returns the size of a string or complex types value stored in the
/// specified row and column.
int32_t variableSizeAt(const char* row, column_index_t column);

/// Copies a string or complex type value from the specified row and column
/// into provided buffer. Stored the size of the data in the first 4 bytes of
/// the buffer. If the value is null, writes zero into the first 4 bytes of
/// destination and returns.
/// @return The number of bytes written to 'destination' including the 4 bytes
/// of the size.
int32_t
extractVariableSizeAt(const char* row, column_index_t column, char* output);

/// Copies a string or complex type value from 'data' into the specified row
/// and column. Expects first 4 bytes in 'data' to contain the size of the
/// string or complex value.
/// @return The number of bytes read from 'data': 4 bytes for size + that many
/// bytes.
int32_t
storeVariableSizeAt(const char* data, char* row, column_index_t column);

template <TypeKind Kind>
static void extractColumnTyped(
const char* FOLLY_NONNULL const* FOLLY_NONNULL rows,
Expand Down
Loading