Skip to content

Commit

Permalink
Simply the memory pool arbitration process management
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Aug 10, 2024
1 parent 1629101 commit 95a65dc
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 132 deletions.
11 changes: 0 additions & 11 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,13 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
debugEnabled_(options.debugEnabled),
coreOnAllocationFailureEnabled_(options.coreOnAllocationFailureEnabled),
poolDestructionCb_([&](MemoryPool* pool) { dropPool(pool); }),
poolGrowCb_([&](MemoryPool* pool, uint64_t targetBytes) {
return growPool(pool, targetBytes);
}),
sysRoot_{std::make_shared<MemoryPoolImpl>(
this,
std::string(kSysRootName),
MemoryPool::Kind::kAggregate,
nullptr,
nullptr,
nullptr,
nullptr,
// NOTE: the default root memory pool has no capacity limit, and it is
// used for system usage in production such as disk spilling.
MemoryPool::Options{
Expand Down Expand Up @@ -268,7 +264,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
MemoryPool::Kind::kAggregate,
nullptr,
std::move(reclaimer),
poolGrowCb_,
poolDestructionCb_,
options);
pools_.emplace(poolName, pool);
Expand All @@ -290,12 +285,6 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
return sysRoot_->addLeafChild(poolName, threadSafe, nullptr);
}

bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
VELOX_CHECK_NOT_NULL(pool);
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
return arbitrator_->growCapacity(pool, incrementBytes);
}

uint64_t MemoryManager::shrinkPools(
uint64_t targetBytes,
bool allowSpill,
Expand Down
6 changes: 0 additions & 6 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ class MemoryManager {
private:
void dropPool(MemoryPool* pool);

// Invoked to grow a memory pool's free capacity with at least
// 'incrementBytes'. The function returns true on success, otherwise false.
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

// Returns the shared references to all the alive memory pools in 'pools_'.
std::vector<std::shared_ptr<MemoryPool>> getAlivePools() const;

Expand All @@ -328,8 +324,6 @@ class MemoryManager {
// tracked by 'pools_'. It is invoked on the root pool destruction and removes
// the pool from 'pools_'.
const MemoryPoolImpl::DestructionCallback poolDestructionCb_;
// Callback invoked by the root memory pool to request memory capacity growth.
const MemoryPoolImpl::GrowCapacityCallback poolGrowCb_;

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
Expand Down
24 changes: 14 additions & 10 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,17 @@ const MemoryArbitrationContext* memoryArbitrationContext() {
return arbitrationCtx;
}

ScopedMemoryArbitration::ScopedMemoryArbitration(MemoryPool* pool)
: pool_(pool), ctx_{pool} {
VELOX_CHECK_NOT_NULL(pool_);
VELOX_CHECK(pool_->isLeaf());
pool_->enterArbitration();
}

ScopedMemoryArbitration::~ScopedMemoryArbitration() {
pool_->leaveArbitration();
}

bool underMemoryArbitration() {
return memoryArbitrationContext() != nullptr;
}
Expand All @@ -515,20 +526,13 @@ void testingRunArbitration(
MemoryPool* pool,
uint64_t targetBytes,
bool allowSpill) {
pool->enterArbitration();
// Seraliazes the testing arbitration injection to make sure that the previous
// op has left arbitration section before starting the next one. This is
// guaranteed by the production code for operation triggered arbitration.
static std::mutex lock;
{
std::lock_guard<std::mutex> l(lock);
ScopedMemoryArbitration scopedArbitration{pool};
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes, allowSpill);
pool->leaveArbitration();
}
// This function is simulating an operator triggered arbitration which
// would check if the query has been aborted after finish arbitration by the
// memory pool capacity grow path.
// This function is simulating an arbitration triggered by growCapacity, which
// would check this.
static_cast<MemoryPoolImpl*>(pool)->testingCheckIfAborted();
}

Expand Down
20 changes: 16 additions & 4 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ class ScopedMemoryArbitrationContext {
public:
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);

// Can be used to restore a previously captured MemoryArbitrationContext.
// contextToRestore can be nullptr if there was no context at the time it was
// captured, in which case arbitrationCtx is unchanged upon
// contruction/destruction of this object.
/// Can be used to restore a previously captured MemoryArbitrationContext.
/// contextToRestore can be nullptr if there was no context at the time it was
/// captured, in which case arbitrationCtx is unchanged upon
/// contruction/destruction of this object.
explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* contextToRestore);

Expand All @@ -425,6 +425,18 @@ class ScopedMemoryArbitrationContext {
MemoryArbitrationContext currentArbitrationCtx_;
};

/// Object used to setup memory arbitration context for a leaf memory pool.
class ScopedMemoryArbitration {
public:
explicit ScopedMemoryArbitration(MemoryPool* pool);

~ScopedMemoryArbitration();

private:
MemoryPool* const pool_;
const ScopedMemoryArbitrationContext ctx_;
};

/// Returns the memory arbitration context set by a per-thread local variable if
/// the running thread is under memory arbitration processing.
const MemoryArbitrationContext* memoryArbitrationContext();
Expand Down
19 changes: 12 additions & 7 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,12 @@ MemoryPoolImpl::MemoryPoolImpl(
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
GrowCapacityCallback growCapacityCb,
DestructionCallback destructionCb,
const Options& options)
: MemoryPool{name, kind, parent, options},
manager_{memoryManager},
allocator_{manager_->allocator()},
growCapacityCb_(std::move(growCapacityCb)),
arbitrator_{manager_->arbitrator()},
destructionCb_(std::move(destructionCb)),
debugPoolNameRegex_(debugEnabled_ ? *(debugPoolNameRegex().rlock()) : ""),
reclaimer_(std::move(reclaimer)),
Expand All @@ -428,8 +427,8 @@ MemoryPoolImpl::MemoryPoolImpl(
capacity_(parent_ != nullptr ? kMaxMemory : 0) {
VELOX_CHECK(options.threadSafe || isLeaf());
VELOX_CHECK(
isRoot() || (destructionCb_ == nullptr && growCapacityCb_ == nullptr),
"Only root memory pool allows to set destruction and capacity grow callbacks: {}",
isRoot() || destructionCb_ == nullptr,
"Only root memory pool allows to set destruction callbacks: {}",
name_);
}

Expand Down Expand Up @@ -733,7 +732,6 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
parent,
std::move(reclaimer),
nullptr,
nullptr,
Options{
.alignment = alignment_,
.trackUsage = trackUsage_,
Expand Down Expand Up @@ -842,8 +840,7 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(

VELOX_CHECK_NULL(parent_);

++numCapacityGrowths_;
if (growCapacityCb_(requestor, size)) {
if (growCapacity(requestor, size)) {
TestValue::adjust(
"facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback",
this);
Expand All @@ -865,6 +862,14 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
treeMemoryUsage()));
}

bool MemoryPoolImpl::growCapacity(MemoryPool* requestor, uint64_t size) {
VELOX_CHECK(requestor->isLeaf());
++numCapacityGrowths_;

ScopedMemoryArbitration scopedArbitration{requestor};
return arbitrator_->growCapacity(this, size);
}

bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
std::lock_guard<std::mutex> l(mutex_);
if (isRoot()) {
Expand Down
42 changes: 21 additions & 21 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// Returns the memory reclaimer of this memory pool if not null.
virtual MemoryReclaimer* reclaimer() const = 0;

/// Invoked by the memory arbitrator to enter memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void enterArbitration() = 0;

/// Invoked by the memory arbitrator to leave memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void leaveArbitration() noexcept = 0;

/// Function estimates the number of reclaimable bytes and returns in
/// 'reclaimableBytes'. If the 'reclaimer' is not set, the function returns
/// std::nullopt. Otherwise, it will invoke the corresponding method of the
Expand Down Expand Up @@ -499,6 +489,16 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
protected:
static constexpr uint64_t kMB = 1 << 20;

/// Invoked by the memory arbitrator to enter memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void enterArbitration() = 0;

/// Invoked by the memory arbitrator to leave memory arbitration processing.
/// It is a noop if 'reclaimer' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void leaveArbitration() noexcept = 0;

/// Invoked to free up to the specified amount of free memory by reducing
/// this memory pool's capacity without actually freeing any used memory. The
/// function returns the actually freed memory capacity in bytes. If
Expand Down Expand Up @@ -557,6 +557,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
friend class velox::exec::ParallelMemoryReclaimer;
friend class MemoryManager;
friend class MemoryArbitrator;
friend class ScopedMemoryArbitration;

VELOX_FRIEND_TEST(MemoryPoolTest, shrinkAndGrowAPIs);
VELOX_FRIEND_TEST(MemoryPoolTest, grow);
Expand All @@ -573,19 +574,13 @@ class MemoryPoolImpl : public MemoryPool {
/// The callback invoked on the root memory pool destruction. It is set by
/// memory manager to removes the pool from 'MemoryManager::pools_'.
using DestructionCallback = std::function<void(MemoryPool*)>;
/// The callback invoked when the used memory reservation of the root memory
/// pool exceed its capacity. It is set by memory manager to grow the memory
/// pool capacity. The callback returns true if the capacity growth succeeds,
/// otherwise false.
using GrowCapacityCallback = std::function<bool(MemoryPool*, uint64_t)>;

MemoryPoolImpl(
MemoryManager* manager,
const std::string& name,
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
GrowCapacityCallback growCapacityCb,
DestructionCallback destructionCb,
const Options& options = Options{});

Expand Down Expand Up @@ -651,10 +646,6 @@ class MemoryPoolImpl : public MemoryPool {

MemoryReclaimer* reclaimer() const override;

void enterArbitration() override;

void leaveArbitration() noexcept override;

std::optional<uint64_t> reclaimableBytes() const override;

uint64_t reclaim(
Expand Down Expand Up @@ -731,6 +722,10 @@ class MemoryPoolImpl : public MemoryPool {
}

private:
void enterArbitration() override;

void leaveArbitration() noexcept override;

uint64_t shrink(uint64_t targetBytes = 0) override;

bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) override;
Expand Down Expand Up @@ -872,6 +867,11 @@ class MemoryPoolImpl : public MemoryPool {

void releaseThreadSafe(uint64_t size, bool releaseOnly);

// Invoked to grow capacity of the root memory pool from the memory
// arbitrator. 'requestor' is the leaf memory pool that triggers the memory
// capacity growth. 'size' is the memory capacity growth in bytes.
bool growCapacity(MemoryPool* requestor, uint64_t size);

FOLLY_ALWAYS_INLINE void releaseNonThreadSafe(
uint64_t size,
bool releaseOnly) {
Expand Down Expand Up @@ -999,7 +999,7 @@ class MemoryPoolImpl : public MemoryPool {

MemoryManager* const manager_;
MemoryAllocator* const allocator_;
const GrowCapacityCallback growCapacityCb_;
MemoryArbitrator* const arbitrator_;
const DestructionCallback destructionCb_;

// Regex for filtering on 'name_' when debug mode is enabled. This allows us
Expand Down
Loading

0 comments on commit 95a65dc

Please sign in to comment.