Skip to content

Commit

Permalink
fix: Address ipprefix type incompatibility mismatch (facebookincubato…
Browse files Browse the repository at this point in the history
…r#12145)

Summary:
Pull Request resolved: facebookincubator#12145

Java represents ipprefix as 17 byte varchar with the first 16 bytes as BE form ipaddress, and the 17th byte as the prefix length.

In Velox, we represent ipprefix as a struct/row of <IPADRESS, TINYTINY> which is internally represented as a ROW<HUGEINT, TINYINT>. Natively, this is represented as int128_t for the ipaddress in LE form and int8_t for the prefix.

The mismatch in types causes serialization and deserialization issues because the encoding for java is VARIABLE_WIDTH encoding but Velox is sending ROW encoding.

To address this, let's transform the velox representation of IPPREFIX to match that of Java. In other words, transform ROW<IPADDRESS, TINYINT> -> VARIABLE_WIDTH (16 Byte BE form of ip, 1 byte tiny).

Reviewed By: Yuhta

Differential Revision: D68285595

fbshipit-source-id: dc441ba978ff6f3dd1c5d8f127d4d090c6f2eaa1
  • Loading branch information
yuandagits authored and facebook-github-bot committed Feb 6, 2025
1 parent c0d8954 commit 34cec02
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 5 deletions.
89 changes: 87 additions & 2 deletions velox/serializers/PrestoSerializerDeserializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::pair<const uint64_t*, int32_t> getStructNulls(int64_t position) {
}

bool hasNestedStructs(const TypePtr& type) {
if (isIPPrefixType(type)) {
return false;
}
if (type->isRow()) {
return true;
}
Expand Down Expand Up @@ -261,7 +264,8 @@ void readStructNullsColumns(
source, columnType, useLosslessTimestamp, scratch);
} else {
checkTypeEncoding(encoding, columnType);
const auto it = readers.find(columnType->kind());
const auto it = readers.find(
isIPPrefixType(columnType) ? TypeKind::VARCHAR : columnType->kind());
VELOX_CHECK(
it != readers.end(),
"Column reader for type {} is missing",
Expand Down Expand Up @@ -423,6 +427,78 @@ int128_t readIpAddress(ByteInputStream* source) {
return reverseIpAddressByteOrder(beIpIntAddr);
}

void readIPPrefixValues(
ByteInputStream* source,
const TypePtr& type,
vector_size_t resultOffset,
const uint64_t* incomingNulls,
int32_t numIncomingNulls,
VectorPtr& result) {
VELOX_DCHECK(isIPPrefixType(type));

// Read # number of rows
const int32_t size = source->read<int32_t>();
const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls);

result->resize(resultOffset + numNewValues);

// Skip # of offsets since we expect IPPrefix to be fixed width of 17 bytes
source->skip(size * sizeof(int32_t));

// Read the null-byte and null-flag if present.
[[maybe_unused]] const auto numNulls = readNulls(
source, size, resultOffset, incomingNulls, numIncomingNulls, *result);

// Read total number of bytes of ipprefix
const int32_t ipprefixBytesSum = source->read<int32_t>();
if (ipprefixBytesSum == 0) {
return;
}

VELOX_DCHECK(
(ipprefixBytesSum % ipaddress::kIPPrefixBytes) == 0,
fmt::format(
"Total sum of ipprefix bytes:{} is not divisible by:{}. rows:{} numNulls:{} totalSize:{}",
ipprefixBytesSum,
ipaddress::kIPPrefixBytes,
size,
numNulls,
result->size()));

VELOX_DCHECK(
result->size() >= numNulls,
fmt::format(
"IPPrefix received more nulls:{} than total num of rows:{}.",
result->size(),
numNulls));

VELOX_DCHECK(
(ipprefixBytesSum == ((size - numNulls) * ipaddress::kIPPrefixBytes)),
fmt::format(
"IPPrefix received invalid number of non-null bytes. Got:{} Expected:{} rows:{} numNulls:{} totalSize:{} numIncomingNulls={} resultOffset={}.",
ipprefixBytesSum,
(size - numNulls) * ipaddress::kIPPrefixBytes,
size,
numNulls,
result->size(),
numIncomingNulls,
resultOffset));

auto row = result->asChecked<RowVector>();
auto ip = row->childAt(0)->asChecked<FlatVector<int128_t>>();
auto prefix = row->childAt(1)->asChecked<FlatVector<int8_t>>();

for (int32_t i = 0; i < numNewValues; ++i) {
if (row->isNullAt(resultOffset + i)) {
continue;
}
// Read 16 bytes and reverse the byte order
ip->set(resultOffset + i, readIpAddress(source));
// Read 1 byte for the prefix order
prefix->set(resultOffset + i, source->read<int8_t>());
}
}

void readIpAddressValues(
ByteInputStream* source,
vector_size_t size,
Expand Down Expand Up @@ -694,6 +770,10 @@ void read<StringView>(
velox::memory::MemoryPool* pool,
const PrestoVectorSerde::PrestoOptions& opts,
VectorPtr& result) {
if (isIPPrefixType(type)) {
return readIPPrefixValues(
source, type, resultOffset, incomingNulls, numIncomingNulls, result);
}
const int32_t size = source->read<int32_t>();
const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls);

Expand Down Expand Up @@ -1236,7 +1316,12 @@ void readColumns(
BaseVector::ensureWritable(
SelectivityVector::empty(), types[i], pool, columnResult);
}
const auto it = readers.find(columnType->kind());

// If the column is ipprefix, we need to force the reader to be
// varbinary so that we can properly deserialize the data from Java.
const auto it = readers.find(
isIPPrefixType(columnType) ? TypeKind::VARBINARY
: columnType->kind());
VELOX_CHECK(
it != readers.end(),
"Column reader for type {} is missing",
Expand Down
82 changes: 82 additions & 0 deletions velox/serializers/PrestoSerializerSerializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,39 @@ void serializeBiasVectorRanges(
}
}

void serializeIPPrefixRanges(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
auto wrappedVector = BaseVector::wrappedVectorShared(vector);
auto rowVector = wrappedVector->asUnchecked<RowVector>();
auto ip = rowVector->childAt(0)->asUnchecked<FlatVector<int128_t>>();
auto prefix = rowVector->childAt(1)->asUnchecked<FlatVector<int8_t>>();

for (int32_t i = 0; i < ranges.size(); ++i) {
auto begin = ranges[i].begin;
auto end = begin + ranges[i].size;
for (auto offset = begin; offset < end; ++offset) {
if (vector->isNullAt(offset)) {
stream->appendNull();
continue;
}
stream->appendNonNull();
stream->appendLength(ipaddress::kIPPrefixBytes);
stream->appendOne(
toJavaIPPrefixType(ip->valueAt(offset), prefix->valueAt(offset)));
}
}
}

void serializeRowVectorRanges(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream,
Scratch& scratch) {
if (isIPPrefixType(vector->type())) {
return serializeIPPrefixRanges(vector, ranges, stream);
}
auto rowVector = vector->as<RowVector>();
std::vector<IndexRange> childRanges;
for (int32_t i = 0; i < ranges.size(); ++i) {
Expand Down Expand Up @@ -482,6 +510,47 @@ void appendStrings(
}
}

void serializeIPPrefix(
const VectorPtr& vector,
const folly::Range<const vector_size_t*>& rows,
VectorStream* stream) {
auto wrappedVector = BaseVector::wrappedVectorShared(vector);
if (!vector->mayHaveNulls()) {
// No nulls, and because ipprefix are fixed size, just append the fixed
// lengths of 17 bytes
stream->appendLengths(nullptr, rows, rows.size(), [&](auto /*row*/) {
return ipaddress::kIPPrefixBytes;
});

auto rowVector = wrappedVector->asChecked<RowVector>();
auto ip = rowVector->childAt(0)->asChecked<FlatVector<int128_t>>();
auto prefix = rowVector->childAt(1)->asChecked<FlatVector<int8_t>>();
for (auto i = 0; i < rows.size(); ++i) {
// Append the first 16 bytes in reverse order because Java always stores
// the ipaddress porition as big endian whereas Velox stores it as little
auto javaIPPrefix =
toJavaIPPrefixType(ip->valueAt(rows[i]), prefix->valueAt(rows[i]));
stream->values().appendStringView(std::string_view(
(const char*)javaIPPrefix.data(), javaIPPrefix.size()));
}
return;
}

auto rowVector = wrappedVector->asChecked<RowVector>();
auto ip = rowVector->childAt(0)->asChecked<FlatVector<int128_t>>();
auto prefix = rowVector->childAt(1)->asChecked<FlatVector<int8_t>>();
for (auto i = 0; i < rows.size(); ++i) {
if (vector->isNullAt(rows[i])) {
stream->appendNull();
continue;
}
stream->appendNonNull();
stream->appendLength(ipaddress::kIPPrefixBytes);
stream->appendOne(
toJavaIPPrefixType(ip->valueAt(rows[i]), prefix->valueAt(rows[i])));
}
}

template <typename T, typename Conv = folly::Identity>
void copyWords(
uint8_t* destination,
Expand Down Expand Up @@ -741,6 +810,9 @@ void serializeWrapped(
simd::transpose(indices, rows, innerRows);
numInner = numRows;
} else {
if (isIPPrefixType(vector->type())) {
return serializeIPPrefix(vector, rows, stream);
}
wrapped = &BaseVector::wrappedVectorShared(vector);
for (int32_t i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && vector->isNullAt(rows[i])) {
Expand Down Expand Up @@ -775,6 +847,10 @@ void serializeConstantVector(
const folly::Range<const vector_size_t*>& rows,
VectorStream* stream,
Scratch& scratch) {
if (isIPPrefixType(vector->type())) {
return serializeIPPrefix(vector, rows, stream);
}

using T = typename KindToFlatVector<kind>::WrapperType;
auto constVector = vector->as<ConstantVector<T>>();
if (constVector->valueVector()) {
Expand All @@ -801,6 +877,9 @@ void serializeRowVector(
const folly::Range<const vector_size_t*>& rows,
VectorStream* stream,
Scratch& scratch) {
if (isIPPrefixType(vector->type())) {
return serializeIPPrefix(vector, rows, stream);
}
auto rowVector = vector->as<RowVector>();
ScratchPtr<uint64_t, 4> nullsHolder(scratch);
ScratchPtr<vector_size_t, 64> innerRowsHolder(scratch);
Expand Down Expand Up @@ -1091,6 +1170,9 @@ std::string_view typeToEncodingName(const TypePtr& type) {
case TypeKind::MAP:
return kMap;
case TypeKind::ROW:
if (isIPPrefixType(type)) {
return kVariableWidth;
}
return kRow;
case TypeKind::UNKNOWN:
return kByteArray;
Expand Down
15 changes: 15 additions & 0 deletions velox/serializers/PrestoSerializerSerializationUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/memory/ByteStream.h"
#include "velox/functions/prestosql/types/IPAddressType.h"
#include "velox/functions/prestosql/types/IPPrefixType.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/DecimalUtil.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -56,6 +57,20 @@ static inline const std::string_view kRLE{"RLE"};
static inline const std::string_view kDictionary{"DICTIONARY"};

void initBitsToMapOnce();

FOLLY_ALWAYS_INLINE std::array<int8_t, ipaddress::kIPPrefixBytes>
toJavaIPPrefixType(int128_t currentIpBytes, int8_t prefix) {
std::array<int8_t, ipaddress::kIPPrefixBytes> byteArray{{0}};
memcpy(&byteArray, &currentIpBytes, sizeof(currentIpBytes));
memcpy((byteArray.begin() + sizeof(currentIpBytes)), &prefix, sizeof(prefix));
if constexpr (folly::kIsLittleEndian) {
std::reverse(byteArray.begin(), byteArray.begin() + sizeof(currentIpBytes));
return byteArray;
} else {
return byteArray;
}
}

FOLLY_ALWAYS_INLINE int128_t
reverseIpAddressByteOrder(int128_t currentIpBytes) {
return DecimalUtil::bigEndian(currentIpBytes);
Expand Down
26 changes: 23 additions & 3 deletions velox/serializers/VectorStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ VectorStream::VectorStream(
isLongDecimal_(type_->isLongDecimal()),
isUuid_(isUuidType(type_)),
isIpAddress_(isIPAddressType(type_)),
isIpPrefix_(isIPPrefixType(type_)),
opts_(opts),
encoding_(getEncoding(encoding, vector)),
nulls_(streamArena, true, true),
lengths_(streamArena),
values_(streamArena) {
if (initialNumRows == 0) {
initializeHeader(typeToEncodingName(type), *streamArena);
if (type_->size() > 0) {
if (type_->size() > 0 && !isIpPrefix_) {
hasLengths_ = true;
children_.reserve(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
Expand Down Expand Up @@ -231,6 +232,15 @@ void VectorStream::flush(OutputStream* out) {

switch (type_->kind()) {
case TypeKind::ROW:
if (isIPPrefixType(type_)) {
writeInt32(out, nullCount_ + nonNullCount_);
lengths_.flush(out);
flushNulls(out);
writeInt32(out, values_.size());
values_.flush(out);
break;
}

if (opts_.nullsFirst) {
writeInt32(out, nullCount_ + nonNullCount_);
flushNulls(out);
Expand Down Expand Up @@ -302,8 +312,8 @@ void VectorStream::clear() {
totalLength_ = 0;
if (hasLengths_) {
lengths_.startWrite(lengths_.size());
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
if ((type_->kind() == TypeKind::ROW && !isIpPrefix_) ||
type_->kind() == TypeKind::ARRAY || type_->kind() == TypeKind::MAP) {
// The first element in the offsets in the wire format is always 0 for
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
Expand Down Expand Up @@ -373,6 +383,16 @@ void VectorStream::initializeFlatStream(
case TypeKind::ARRAY:
[[fallthrough]];
case TypeKind::MAP:
// Velox represents ipprefix as a row, but we need
// to serialize the data type as varbinary to be compatible with Java
if (isIpPrefix_) {
hasLengths_ = true;
lengths_.startWrite(0);
if (values_.ranges().empty()) {
values_.startWrite(0);
}
break;
}
hasLengths_ = true;
children_.reserve(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
Expand Down
5 changes: 5 additions & 0 deletions velox/serializers/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ class VectorStream {
return isIpAddress_;
}

bool isIpPrefix() const {
return isIpPrefix_;
}

void clear();

private:
Expand All @@ -201,6 +205,7 @@ class VectorStream {
const bool isLongDecimal_;
const bool isUuid_;
const bool isIpAddress_;
const bool isIpPrefix_;
const PrestoVectorSerde::PrestoOptions opts_;
std::optional<VectorEncoding::Simple> encoding_;
int32_t nonNullCount_{0};
Expand Down
Loading

0 comments on commit 34cec02

Please sign in to comment.