From 06f4c88632e1d6ec7b99a868bf884317eed40fa6 Mon Sep 17 00:00:00 2001 From: Vitaly Stoyan Date: Wed, 17 Jul 2024 02:10:02 +0300 Subject: [PATCH] Arrow memory tracking (#6731) --- .../yql/minikql/comp_nodes/mkql_apply.cpp | 1 + .../minikql/comp_nodes/mkql_scalar_apply.cpp | 1 + .../computation/mkql_block_transport.cpp | 13 ++-- ydb/library/yql/minikql/mkql_alloc.cpp | 64 +++++++++++++++++-- ydb/library/yql/minikql/mkql_alloc.h | 17 ++++- 5 files changed, 85 insertions(+), 11 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp index 5f24fe2ccc15..fdfe012349f7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_apply.cpp @@ -24,6 +24,7 @@ class TApplyWrapper: public TMutableCodegeneratorPtrNode { , ValueBuilder(HolderFactory, NUdf::EValidatePolicy::Exception) , Args(argsCount) { + Alloc.Ref().EnableArrowTracking = false; Alloc.Release(); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp index d33890850e25..c440051218e6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_scalar_apply.cpp @@ -55,6 +55,7 @@ class TScalarApplyWrapper : public TMutableComputationNode nullptr, Alloc.Ref(), *RandomProvider, *TimeProvider, NUdf::EValidatePolicy::Exception, nullptr), originalContext.Mutables, *NYql::NUdf::GetYqlMemoryPool()) { + Alloc.Ref().EnableArrowTracking = false; Alloc.Release(); } diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index e9700ed43995..87082c098e0f 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -10,6 +10,11 @@ namespace NKikimr::NMiniKQL { namespace { +TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr& owner, const char* data, size_t size) { + MKQLArrowUntrack(owner->data()); + return NYql::MakeReadOnlyRope(owner, data, size); +} + class TOwnedArrowBuffer : public arrow::Buffer { public: TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr& owner) @@ -76,7 +81,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) { YQL_ENSURE(desiredOffset <= (size_t)data.offset); YQL_ENSURE((data.offset - desiredOffset) % 8 == 0); const char* nulls = data.GetValues(0, 0) + (data.offset - desiredOffset) / 8; - dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes)); + dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes)); } void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe& result) { @@ -205,7 +210,7 @@ class TFixedSizeBlockSerializer final : public IBlockSerializer { const ui64 desiredOffset = data.offset % 8; const char* buf = reinterpret_cast(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize; size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; - dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes)); + dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes)); } }; @@ -277,11 +282,11 @@ class TStringBlockSerializer final : public IBlockSerializer { const ui64 desiredOffset = data.offset % 8; const char* offsets = reinterpret_cast(data.GetValues(1) - desiredOffset); size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); - dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize)); + dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize)); const char* mainData = reinterpret_cast(data.buffers[2]->data()); size_t mainSize = data.buffers[2]->size(); - dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize)); + dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize)); } }; diff --git a/ydb/library/yql/minikql/mkql_alloc.cpp b/ydb/library/yql/minikql/mkql_alloc.cpp index 3d6d9186d84b..9e134bbbd34b 100644 --- a/ydb/library/yql/minikql/mkql_alloc.cpp +++ b/ydb/library/yql/minikql/mkql_alloc.cpp @@ -20,7 +20,7 @@ void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept { void TAllocState::TListEntry::Unlink() noexcept { std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right); - Left = Right = nullptr; + Clear(); } TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators) @@ -31,13 +31,14 @@ TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAligne GetRoot()->InitLinks(); OffloadedBlocksRoot.InitLinks(); GlobalPAllocList.InitLinks(); + ArrowBlocksRoot.InitLinks(); } void TAllocState::CleanupPAllocList(TListEntry* root) { for (auto curr = root->Right; curr != root; ) { auto next = curr->Right; auto size = ((TMkqlPAllocHeader*)curr)->Size; - auto fullSize = size + sizeof(TMkqlPAllocHeader); + auto fullSize = size + sizeof(TMkqlPAllocHeader); MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot curr = next; } @@ -45,6 +46,18 @@ void TAllocState::CleanupPAllocList(TListEntry* root) { root->InitLinks(); } +void TAllocState::CleanupArrowList(TListEntry* root) { + for (auto curr = root->Right; curr != root; ) { + auto next = curr->Right; + auto size = ((TMkqlArrowHeader*)curr)->Size; + auto fullSize = size + sizeof(TMkqlArrowHeader); + ReleaseAlignedPage(curr, fullSize); + curr = next; + } + + root->InitLinks(); +} + void TAllocState::KillAllBoxed() { { const auto root = GetRoot(); @@ -72,6 +85,8 @@ void TAllocState::KillAllBoxed() { OffloadedBlocksRoot.InitLinks(); } + CleanupArrowList(&ArrowBlocksRoot); + #ifndef NDEBUG ActiveMemInfo.clear(); #endif @@ -230,18 +245,55 @@ void TPagedArena::Clear() noexcept { } void* MKQLArrowAllocate(ui64 size) { - return GetAlignedPage(size); + TAllocState* state = TlsAllocState; + Y_ENSURE(state); + auto fullSize = size + sizeof(TMkqlArrowHeader); + if (state->EnableArrowTracking) { + state->OffloadAlloc(fullSize); + } + + auto ptr = GetAlignedPage(fullSize); + auto header = (TMkqlArrowHeader*)ptr; + if (state->EnableArrowTracking) { + header->Entry.Link(&state->ArrowBlocksRoot); + } else { + header->Entry.Clear(); + } + + header->Size = size; + return header + 1; } void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) { - auto res = GetAlignedPage(size); + auto res = MKQLArrowAllocate(size); memcpy(res, mem, Min(prevSize, size)); - ReleaseAlignedPage(const_cast(mem), prevSize); + MKQLArrowFree(mem, prevSize); return res; } void MKQLArrowFree(const void* mem, ui64 size) { - ReleaseAlignedPage(const_cast(mem), size); + auto fullSize = size + sizeof(TMkqlArrowHeader); + auto header = ((TMkqlArrowHeader*)mem) - 1; + if (!header->Entry.IsUnlinked()) { + TAllocState* state = TlsAllocState; + Y_ENSURE(state); + state->OffloadFree(fullSize); + header->Entry.Unlink(); + } + + Y_ENSURE(size == header->Size); + ReleaseAlignedPage(header, fullSize); +} + +void MKQLArrowUntrack(const void* mem) { + TAllocState* state = TlsAllocState; + Y_ENSURE(state); + auto header = ((TMkqlArrowHeader*)mem) - 1; + if (!header->Entry.IsUnlinked()) { + header->Entry.Unlink(); + auto fullSize = header->Size + sizeof(TMkqlArrowHeader); + state->OffloadFree(fullSize); + } } } // NMiniKQL diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 1656f2db6dff..a61a39e34fe7 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -50,6 +50,8 @@ struct TAllocState : public TAlignedPagePool void Link(TListEntry* root) noexcept; void Unlink() noexcept; void InitLinks() noexcept { Left = Right = this; } + void Clear() noexcept { Left = Right = nullptr; } + bool IsUnlinked() const noexcept { return !Left && !Right; } }; #ifndef NDEBUG @@ -74,7 +76,9 @@ struct TAllocState : public TAlignedPagePool TListEntry OffloadedBlocksRoot; TListEntry GlobalPAllocList; TListEntry* CurrentPAllocList; - std::shared_ptr> ArrowMemoryUsage = std::make_shared>(); + TListEntry ArrowBlocksRoot; + bool EnableArrowTracking = true; + void* MainContext = nullptr; void* CurrentContext = nullptr; @@ -97,6 +101,7 @@ struct TAllocState : public TAlignedPagePool void InvalidateMemInfo(); size_t GetDeallocatedInPages() const; static void CleanupPAllocList(TListEntry* root); + static void CleanupArrowList(TListEntry* root); void LockObject(::NKikimr::NUdf::TUnboxedValuePod value); void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value); @@ -163,6 +168,15 @@ static_assert(sizeof(TMkqlPAllocHeader) == sizeof(TAllocState::TListEntry) + sizeof(void*), "Padding is not allowed"); +constexpr size_t ArrowAlignment = 64; +struct TMkqlArrowHeader { + TAllocState::TListEntry Entry; + ui64 Size; + char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)]; +}; + +static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment); + class TScopedAlloc { public: explicit TScopedAlloc(const TSourceLocation& location, @@ -410,6 +424,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept { void* MKQLArrowAllocate(ui64 size); void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size); void MKQLArrowFree(const void* mem, ui64 size); +void MKQLArrowUntrack(const void* mem); template struct TWithMiniKQLAlloc {