Skip to content

Commit

Permalink
Allow TOwnedCellVec construction directly from cellvec data (ydb-plat…
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Dec 24, 2024
1 parent c594576 commit 7daac53
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 69 deletions.
279 changes: 214 additions & 65 deletions ydb/core/scheme/scheme_tablecell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,110 @@

namespace NKikimr {

namespace {

struct TCellHeader {
static constexpr ui32 NullFlag = ui32(1) << 31;

ui32 RawValue = 0;

TCellHeader() = default;

TCellHeader(ui32 rawValue) : RawValue(rawValue) {}

TCellHeader(ui32 cellSize, bool isNull)
: RawValue(cellSize | (isNull ? NullFlag : 0))
{}

ui32 CellSize() const { return RawValue & ~NullFlag; }

bool IsNull() const { return RawValue & NullFlag; };
};

static_assert(sizeof(TCellHeader) == sizeof(ui32));

class TSerializedCellReader {
public:
TSerializedCellReader(std::string_view data) noexcept
: Ptr(data.data())
, Size(data.size())
{}

TSerializedCellReader(const char* p, size_t size) noexcept
: Ptr(p)
, Size(size)
{}

std::string_view Snapshot() const noexcept {
return std::string_view(Ptr, Size);
}

void Reset(std::string_view data) noexcept {
Ptr = data.data();
Size = data.size();
}

bool Skip(size_t size) noexcept {
if (Y_UNLIKELY(Size < size)) {
return false;
}

Ptr += size;
Size -= size;
return true;
}

bool Skip(size_t size, const char** p) noexcept {
if (Y_UNLIKELY(Size < size)) {
return false;
}

*p = Ptr;
Ptr += size;
Size -= size;
return true;
}

template<class T>
bool Read(T* dst) noexcept {
if (Y_UNLIKELY(Size < sizeof(T))) {
return false;
}

::memcpy(dst, Ptr, sizeof(T));
Ptr += sizeof(T);
Size -= sizeof(T);
return true;
}

bool ReadNewCell(TCell* cell) noexcept {
TCellHeader header;
if (!Read(&header)) {
return false;
}

if (Y_UNLIKELY(Size < header.CellSize())) {
return false;
}

if (header.IsNull()) {
new (cell) TCell();
} else {
new (cell) TCell(Ptr, header.CellSize());
}

Ptr += header.CellSize();
Size -= header.CellSize();
return true;
}

private:
const char* Ptr;
size_t Size;
};

} // namespace

void TOwnedCellVec::TData::operator delete(void* mem) noexcept {
::free(mem);
}
Expand All @@ -28,7 +132,7 @@ TOwnedCellVec::TInit TOwnedCellVec::Allocate(TOwnedCellVec::TCellVec cells) {
for (auto& x : cells) {
if (!x.IsNull() && !x.IsInline()) {
const size_t xsz = x.Size();
size += AlignUp(xsz);
size += AlignUp(xsz, size_t(4));
}
}

Expand All @@ -53,7 +157,87 @@ TOwnedCellVec::TInit TOwnedCellVec::Allocate(TOwnedCellVec::TCellVec cells) {
::memcpy(ptrData, x.Data(), cellSize);
}
new (ptrCell) TCell(ptrData, cellSize);
ptrData += AlignUp(cellSize);
ptrData += AlignUp(cellSize, size_t(4));
}

++ptrCell;
}

return TInit {
cellvec,
new (mem) TData(),
size,
};
}

TOwnedCellVec::TInit TOwnedCellVec::AllocateFromSerialized(std::string_view data) {
if (data.empty()) {
// Leave the data field empty
return TInit{
TCellVec(),
nullptr,
0,
};
}

TSerializedCellReader reader(data);

ui16 cellCount;
if (!reader.Read(&cellCount)) {
throw std::invalid_argument("cannot deserialize cellvec header");
}

if (cellCount == 0) {
// Leave the data field empty
return TInit{
TCellVec(),
nullptr,
0,
};
}

size_t size = sizeof(TData) + sizeof(TCell) * cellCount;

auto snapshot = reader.Snapshot();
for (ui16 i = 0; i < cellCount; ++i) {
TCellHeader cellHeader;
if (!reader.Read(&cellHeader) || !reader.Skip(cellHeader.CellSize())) {
throw std::invalid_argument("cannot deserialize cell data");
}
size_t cellSize = cellHeader.CellSize();
if (!cellHeader.IsNull() && !TCell::CanInline(cellSize)) {
size += AlignUp(cellSize, size_t(4));
}
}

void* mem = ::malloc(size);
if (Y_UNLIKELY(!mem)) {
throw std::bad_alloc();
}

TCell* ptrCell = (TCell*)((TData*)mem + 1);
char* ptrData = (char*)(ptrCell + cellCount);

TConstArrayRef<TCell> cellvec(ptrCell, ptrCell + cellCount);

reader.Reset(snapshot);
for (ui16 i = 0; i < cellCount; ++i) {
TCellHeader cellHeader;
const char* src;
if (!reader.Read(&cellHeader) || !reader.Skip(cellHeader.CellSize(), &src)) {
Y_ABORT("Unexpected failure to deserialize cell data a second time");
}
size_t cellSize = cellHeader.CellSize();
if (cellHeader.IsNull()) {
new (ptrCell) TCell();
} else if (TCell::CanInline(cellSize)) {
new (ptrCell) TCell(src, cellSize);
} else {
if (Y_LIKELY(cellSize > 0)) {
::memcpy(ptrData, src, cellSize);
}
new (ptrCell) TCell(ptrData, cellSize);
ptrData += AlignUp(cellSize, size_t(4));
}

++ptrCell;
Expand Down Expand Up @@ -125,24 +309,6 @@ bool TCellVectorsEquals::operator() (TConstArrayRef<TCell> a, TConstArrayRef<TCe

namespace {

struct TCellHeader {
TCellHeader() = default;

TCellHeader(ui32 rawValue) : RawValue(rawValue) {}

TCellHeader(ui32 cellSize, bool isNull)
: RawValue(cellSize | (static_cast<ui32>(isNull) << 31))
{}

ui32 CellSize() const { return RawValue & ~(1ULL << 31); }

bool IsNull() const { return RawValue & (1ULL << 31); };

ui32 RawValue = 0;
};

static_assert(sizeof(TCellHeader) == sizeof(ui32));

Y_FORCE_INLINE void SerializeCellVecBody(TConstArrayRef<TCell> cells, char* resultBufferData, TVector<TCell>* resultCells) {
if (resultCells)
resultCells->resize_uninitialized(cells.size());
Expand Down Expand Up @@ -216,30 +382,17 @@ namespace {
SerializeCellVecBody(cells, resultBufferData, resultCells);
}

Y_FORCE_INLINE bool TryDeserializeCellVecBody(const char* buf, const char* bufEnd, ui64 cellCount, TVector<TCell>& resultCells) {
Y_FORCE_INLINE bool TryDeserializeCellVecBody(std::string_view buf, ui64 cellCount, TVector<TCell>& resultCells) {
resultCells.resize_uninitialized(cellCount);
TCell* resultCellsData = resultCells.data();
TCell* dst = resultCells.data();

TSerializedCellReader reader(buf);
for (ui64 i = 0; i < cellCount; ++i) {
if (Y_UNLIKELY(bufEnd - buf < static_cast<ptrdiff_t>(sizeof(TCellHeader)))) {
resultCells.clear();
return false;
}

TCellHeader cellHeader = ReadUnaligned<TCellHeader>(buf);
buf += sizeof(cellHeader);

if (Y_UNLIKELY(bufEnd - buf < static_cast<ptrdiff_t>(cellHeader.CellSize()))) {
if (!reader.ReadNewCell(dst)) {
resultCells.clear();
return false;
}

if (cellHeader.IsNull())
new (resultCellsData + i) TCell();
else
new (resultCellsData + i) TCell(buf, cellHeader.CellSize());

buf += cellHeader.CellSize();
++dst;
}

return true;
Expand All @@ -249,49 +402,45 @@ namespace {
resultBuffer.clear();
resultCells.clear();

if (data.empty())
if (data.empty()) {
return true;
}

const char* buf = data.data();
const char* bufEnd = data.data() + data.size();
if (Y_UNLIKELY(bufEnd - buf < static_cast<ptrdiff_t>(sizeof(ui16))))
return false;
TSerializedCellReader reader(data);

ui16 cellCount = ReadUnaligned<ui16>(buf);
buf += sizeof(cellCount);
ui16 cellCount;
if (!reader.Read(&cellCount)) {
return false;
}

if (TryDeserializeCellVecBody(buf, bufEnd, cellCount, resultCells)) {
resultBuffer = data;
return true;
if (!TryDeserializeCellVecBody(reader.Snapshot(), cellCount, resultCells)) {
return false;
}

return false;
resultBuffer = data;
return true;
}

Y_FORCE_INLINE bool TryDeserializeCellMatrix(const TString& data, TString& resultBuffer, TVector<TCell>& resultCells, ui32& rowCount, ui16& colCount) {
resultBuffer.clear();
resultCells.clear();

if (data.empty())
if (data.empty()) {
return true;
}

const char* buf = data.data();
const char* bufEnd = data.data() + data.size();
if (Y_UNLIKELY(bufEnd - buf < static_cast<ptrdiff_t>(sizeof(ui16))))
TSerializedCellReader reader(data);
if (!reader.Read(&rowCount) || !reader.Read(&colCount)) {
return false;

rowCount = ReadUnaligned<ui32>(buf);
buf += sizeof(rowCount);
colCount = ReadUnaligned<ui16>(buf);
buf += sizeof(colCount);
}

ui64 cellCount = (ui64)rowCount * (ui64)colCount;
if (TryDeserializeCellVecBody(buf, bufEnd, cellCount, resultCells)) {
resultBuffer = data;
return true;
if (!TryDeserializeCellVecBody(reader.Snapshot(), cellCount, resultCells)) {
return false;
}

return false;
resultBuffer = data;
return true;
}
}

Expand Down Expand Up @@ -419,7 +568,7 @@ void TCellsStorage::Reset(TArrayRef<const TCell> cells)
for (size_t i = 0; i < cellsSize; ++i) {
const auto & cell = cells[i];
if (!cell.IsNull() && !cell.IsInline() && cell.Size() != 0) {
cellsDataSize += AlignUp(static_cast<size_t>(cell.Size()));
cellsDataSize += AlignUp(static_cast<size_t>(cell.Size()), size_t(4));
}
}

Expand All @@ -435,7 +584,7 @@ void TCellsStorage::Reset(TArrayRef<const TCell> cells)
if (!cell.IsNull() && !cell.IsInline() && cell.Size() != 0) {
memcpy(cellsData, cell.Data(), cell.Size());
Cells[i] = TCell(cellsData, cell.Size());
cellsData += AlignUp(static_cast<size_t>(cell.Size()));
cellsData += AlignUp(static_cast<size_t>(cell.Size()), size_t(4));
} else {
Cells[i] = cell;
}
Expand Down Expand Up @@ -473,7 +622,7 @@ size_t TOwnedCellVecBatch::Append(TConstArrayRef<TCell> cells) {
::memcpy(ptrData, cell.Data(), cellSize);
}
new (ptrCell) TCell(ptrData, cellSize);
ptrData += AlignUp(cellSize);
ptrData += AlignUp(cellSize, size_t(4));
}

++ptrCell;
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/scheme/scheme_tablecell.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ inline size_t EstimateSize(TCellsRef cells) {
for (auto& cell : cells) {
if (!cell.IsNull() && !cell.IsInline()) {
const size_t cellSize = cell.Size();
size += AlignUp(cellSize);
size += AlignUp(cellSize, size_t(4));
}
}

Expand Down Expand Up @@ -439,6 +439,8 @@ class TOwnedCellVec

static TInit Allocate(TCellVec cells);

static TInit AllocateFromSerialized(std::string_view data);

TCellVec& CellVec() {
return static_cast<TCellVec&>(*this);
}
Expand All @@ -457,6 +459,10 @@ class TOwnedCellVec
return TOwnedCellVec(Allocate(cells));
}

static TOwnedCellVec FromSerialized(std::string_view data) {
return TOwnedCellVec(AllocateFromSerialized(data));
}

TOwnedCellVec(const TOwnedCellVec& rhs) noexcept
: TCellVec(rhs)
, Data(rhs.Data)
Expand Down
Loading

0 comments on commit 7daac53

Please sign in to comment.