Skip to content

Commit

Permalink
Arrow memory tracking (ydb-platform#6731)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Jul 16, 2024
1 parent 5e926e1 commit 06f4c88
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 11 deletions.
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TApplyWrapper: public TMutableCodegeneratorPtrNode<TApplyWrapper> {
, ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception)
, Args(argsCount)
{
Alloc.Ref().EnableArrowTracking = false;
Alloc.Release();
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TScalarApplyWrapper : public TMutableComputationNode<TScalarApplyWrapper>
nullptr, Alloc.Ref(), *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, nullptr),
originalContext.Mutables, *NYql::NUdf::GetYqlMemoryPool())
{
Alloc.Ref().EnableArrowTracking = false;
Alloc.Release();
}

Expand Down
13 changes: 9 additions & 4 deletions ydb/library/yql/minikql/computation/mkql_block_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ namespace NKikimr::NMiniKQL {

namespace {

TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
MKQLArrowUntrack(owner->data());
return NYql::MakeReadOnlyRope(owner, data, size);
}

class TOwnedArrowBuffer : public arrow::Buffer {
public:
TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
Expand Down Expand Up @@ -76,7 +81,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes));
}

void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
Expand Down Expand Up @@ -205,7 +210,7 @@ class TFixedSizeBlockSerializer final : public IBlockSerializer {
const ui64 desiredOffset = data.offset % 8;
const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes));
}
};

Expand Down Expand Up @@ -277,11 +282,11 @@ class TStringBlockSerializer final : public IBlockSerializer {
const ui64 desiredOffset = data.offset % 8;
const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize));

const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
size_t mainSize = data.buffers[2]->size();
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize));
dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize));
}
};

Expand Down
64 changes: 58 additions & 6 deletions ydb/library/yql/minikql/mkql_alloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {

void TAllocState::TListEntry::Unlink() noexcept {
std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
Left = Right = nullptr;
Clear();
}

TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
Expand All @@ -31,20 +31,33 @@ TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAligne
GetRoot()->InitLinks();
OffloadedBlocksRoot.InitLinks();
GlobalPAllocList.InitLinks();
ArrowBlocksRoot.InitLinks();
}

void TAllocState::CleanupPAllocList(TListEntry* root) {
for (auto curr = root->Right; curr != root; ) {
auto next = curr->Right;
auto size = ((TMkqlPAllocHeader*)curr)->Size;
auto fullSize = size + sizeof(TMkqlPAllocHeader);
auto fullSize = size + sizeof(TMkqlPAllocHeader);
MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
curr = next;
}

root->InitLinks();
}

void TAllocState::CleanupArrowList(TListEntry* root) {
for (auto curr = root->Right; curr != root; ) {
auto next = curr->Right;
auto size = ((TMkqlArrowHeader*)curr)->Size;
auto fullSize = size + sizeof(TMkqlArrowHeader);
ReleaseAlignedPage(curr, fullSize);
curr = next;
}

root->InitLinks();
}

void TAllocState::KillAllBoxed() {
{
const auto root = GetRoot();
Expand Down Expand Up @@ -72,6 +85,8 @@ void TAllocState::KillAllBoxed() {
OffloadedBlocksRoot.InitLinks();
}

CleanupArrowList(&ArrowBlocksRoot);

#ifndef NDEBUG
ActiveMemInfo.clear();
#endif
Expand Down Expand Up @@ -230,18 +245,55 @@ void TPagedArena::Clear() noexcept {
}

void* MKQLArrowAllocate(ui64 size) {
return GetAlignedPage(size);
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto fullSize = size + sizeof(TMkqlArrowHeader);
if (state->EnableArrowTracking) {
state->OffloadAlloc(fullSize);
}

auto ptr = GetAlignedPage(fullSize);
auto header = (TMkqlArrowHeader*)ptr;
if (state->EnableArrowTracking) {
header->Entry.Link(&state->ArrowBlocksRoot);
} else {
header->Entry.Clear();
}

header->Size = size;
return header + 1;
}

void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
auto res = GetAlignedPage(size);
auto res = MKQLArrowAllocate(size);
memcpy(res, mem, Min(prevSize, size));
ReleaseAlignedPage(const_cast<void*>(mem), prevSize);
MKQLArrowFree(mem, prevSize);
return res;
}

void MKQLArrowFree(const void* mem, ui64 size) {
ReleaseAlignedPage(const_cast<void*>(mem), size);
auto fullSize = size + sizeof(TMkqlArrowHeader);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
state->OffloadFree(fullSize);
header->Entry.Unlink();
}

Y_ENSURE(size == header->Size);
ReleaseAlignedPage(header, fullSize);
}

void MKQLArrowUntrack(const void* mem) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
header->Entry.Unlink();
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
state->OffloadFree(fullSize);
}
}

} // NMiniKQL
Expand Down
17 changes: 16 additions & 1 deletion ydb/library/yql/minikql/mkql_alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct TAllocState : public TAlignedPagePool
void Link(TListEntry* root) noexcept;
void Unlink() noexcept;
void InitLinks() noexcept { Left = Right = this; }
void Clear() noexcept { Left = Right = nullptr; }
bool IsUnlinked() const noexcept { return !Left && !Right; }
};

#ifndef NDEBUG
Expand All @@ -74,7 +76,9 @@ struct TAllocState : public TAlignedPagePool
TListEntry OffloadedBlocksRoot;
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>();
TListEntry ArrowBlocksRoot;
bool EnableArrowTracking = true;

void* MainContext = nullptr;
void* CurrentContext = nullptr;

Expand All @@ -97,6 +101,7 @@ struct TAllocState : public TAlignedPagePool
void InvalidateMemInfo();
size_t GetDeallocatedInPages() const;
static void CleanupPAllocList(TListEntry* root);
static void CleanupArrowList(TListEntry* root);

void LockObject(::NKikimr::NUdf::TUnboxedValuePod value);
void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value);
Expand Down Expand Up @@ -163,6 +168,15 @@ static_assert(sizeof(TMkqlPAllocHeader) ==
sizeof(TAllocState::TListEntry) +
sizeof(void*), "Padding is not allowed");

constexpr size_t ArrowAlignment = 64;
struct TMkqlArrowHeader {
TAllocState::TListEntry Entry;
ui64 Size;
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
};

static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);

class TScopedAlloc {
public:
explicit TScopedAlloc(const TSourceLocation& location,
Expand Down Expand Up @@ -410,6 +424,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
void* MKQLArrowAllocate(ui64 size);
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
void MKQLArrowFree(const void* mem, ui64 size);
void MKQLArrowUntrack(const void* mem);

template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
struct TWithMiniKQLAlloc {
Expand Down

0 comments on commit 06f4c88

Please sign in to comment.