Skip to content

Commit

Permalink
Avoid the deadlock when failed to create root memory pool (#11042)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11042

The recent code change will put root memory pool creation under memory manager's lock which
could cause deadlock.
This PR fixes the problems when either arbitrator add pool fails or duplicate root name detected. We never
expect set destruction callback fails

Reviewed By: tanjialiang, oerling

Differential Revision: D63039106

fbshipit-source-id: 95faed6f93854a8854a9121cb9aa914ad1ad5350
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 20, 2024
1 parent 19d184e commit 9851a4e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 33 deletions.
31 changes: 18 additions & 13 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
MemoryPool::Kind::kAggregate,
nullptr,
nullptr,
nullptr,
// NOTE: the default root memory pool has no capacity limit, and it is
// used for system usage in production such as disk spilling.
MemoryPool::Options{
Expand Down Expand Up @@ -247,7 +246,7 @@ uint16_t MemoryManager::alignment() const {
return alignment_;
}

std::shared_ptr<MemoryPool> MemoryManager::createRootPool(
std::shared_ptr<MemoryPoolImpl> MemoryManager::createRootPool(
std::string poolName,
std::unique_ptr<MemoryReclaimer>& reclaimer,
MemoryPool::Options& options) {
Expand All @@ -257,7 +256,6 @@ std::shared_ptr<MemoryPool> MemoryManager::createRootPool(
MemoryPool::Kind::kAggregate,
nullptr,
std::move(reclaimer),
poolDestructionCb_,
options);
VELOX_CHECK_EQ(pool->capacity(), 0);
arbitrator_->addPool(pool);
Expand All @@ -283,16 +281,23 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
options.debugEnabled = debugEnabled_;
options.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_;

if (disableMemoryPoolTracking_) {
return createRootPool(poolName, reclaimer, options);
}

std::unique_lock guard{mutex_};
if (pools_.find(poolName) != pools_.end()) {
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
}
auto pool = createRootPool(poolName, reclaimer, options);
pools_.emplace(poolName, pool);
if (!disableMemoryPoolTracking_) {
try {
std::unique_lock guard{mutex_};
if (pools_.find(poolName) != pools_.end()) {
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
}
pools_.emplace(poolName, pool);
} catch (const VeloxRuntimeError& ex) {
arbitrator_->removePool(pool.get());
throw;
}
}
// NOTE: we need to set destruction callback at the end to avoid potential
// deadlock or failure because of duplicate memory pool name or unexpected
// failure to add memory pool to the arbitrator.
pool->setDestructionCallback(poolDestructionCb_);
return pool;
}

Expand All @@ -315,12 +320,12 @@ uint64_t MemoryManager::shrinkPools(
}

void MemoryManager::dropPool(MemoryPool* pool) {
VELOX_CHECK_NOT_NULL(pool);
VELOX_DCHECK_EQ(pool->reservedBytes(), 0);
arbitrator_->removePool(pool);
if (disableMemoryPoolTracking_) {
return;
}
VELOX_CHECK_NOT_NULL(pool);
std::unique_lock guard{mutex_};
auto it = pools_.find(pool->name());
if (it == pools_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class MemoryManager {
}

private:
std::shared_ptr<MemoryPool> createRootPool(
std::shared_ptr<MemoryPoolImpl> createRootPool(
std::string poolName,
std::unique_ptr<MemoryReclaimer>& reclaimer,
MemoryPool::Options& options);
Expand Down
19 changes: 12 additions & 7 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,23 +418,17 @@ MemoryPoolImpl::MemoryPoolImpl(
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
DestructionCallback destructionCb,
const Options& options)
: MemoryPool{name, kind, parent, options},
manager_{memoryManager},
allocator_{manager_->allocator()},
arbitrator_{manager_->arbitrator()},
destructionCb_(std::move(destructionCb)),
debugPoolNameRegex_(debugEnabled_ ? *(debugPoolNameRegex().rlock()) : ""),
reclaimer_(std::move(reclaimer)),
// The memory manager sets the capacity through grow() according to the
// actually used memory arbitration policy.
capacity_(parent_ != nullptr ? kMaxMemory : 0) {
VELOX_CHECK(options.threadSafe || isLeaf());
VELOX_CHECK(
isRoot() || destructionCb_ == nullptr,
"Only root memory pool allows to set destruction callbacks: {}",
name_);
}

MemoryPoolImpl::~MemoryPoolImpl() {
Expand Down Expand Up @@ -736,7 +730,6 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
kind,
parent,
std::move(reclaimer),
nullptr,
Options{
.alignment = alignment_,
.trackUsage = trackUsage_,
Expand Down Expand Up @@ -1148,6 +1141,18 @@ void MemoryPoolImpl::checkIfAborted() const {
}
}

void MemoryPoolImpl::setDestructionCallback(
const DestructionCallback& callback) {
VELOX_CHECK_NOT_NULL(callback);
VELOX_CHECK(
isRoot(),
"Only root memory pool allows to set destruction callbacks: {}",
name_);
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK_NULL(destructionCb_);
destructionCb_ = callback;
}

void MemoryPoolImpl::testingSetCapacity(int64_t bytes) {
if (parent_ != nullptr) {
return toImpl(parent_)->testingSetCapacity(bytes);
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ class MemoryPoolImpl : public MemoryPool {
Kind kind,
std::shared_ptr<MemoryPool> parent,
std::unique_ptr<MemoryReclaimer> reclaimer,
DestructionCallback destructionCb,
const Options& options = Options{});

~MemoryPoolImpl() override;
Expand Down Expand Up @@ -656,6 +655,8 @@ class MemoryPoolImpl : public MemoryPool {

void abort(const std::exception_ptr& error) override;

void setDestructionCallback(const DestructionCallback& callback);

std::string toString() const override {
std::lock_guard<std::mutex> l(mutex_);
return toStringLocked();
Expand Down Expand Up @@ -1001,7 +1002,6 @@ class MemoryPoolImpl : public MemoryPool {
MemoryManager* const manager_;
MemoryAllocator* const allocator_;
MemoryArbitrator* const arbitrator_;
const DestructionCallback destructionCb_;

// Regex for filtering on 'name_' when debug mode is enabled. This allows us
// to only track the callsites of memory allocations for memory pools whose
Expand All @@ -1015,6 +1015,8 @@ class MemoryPoolImpl : public MemoryPool {
// the same parent do not have to be serialized.
mutable std::mutex mutex_;

DestructionCallback destructionCb_;

// Used by memory arbitration to reclaim memory from the associated query
// object if not null. For example, a memory pool can reclaim the used memory
// from a spillable operator through disk spilling. If null, we can't reclaim
Expand Down
30 changes: 27 additions & 3 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,18 @@ TEST_F(MemoryManagerTest, ctor) {
namespace {
class FakeTestArbitrator : public MemoryArbitrator {
public:
explicit FakeTestArbitrator(const Config& config)
explicit FakeTestArbitrator(
const Config& config,
bool injectAddPoolFailure = false)
: MemoryArbitrator(
{.kind = config.kind,
.capacity = config.capacity,
.extraConfigs = config.extraConfigs}) {}
.extraConfigs = config.extraConfigs}),
injectAddPoolFailure_(injectAddPoolFailure) {}

void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}
void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {
VELOX_CHECK(!injectAddPoolFailure_, "Failed to add pool");
}

void removePool(MemoryPool* /*unused*/) override {}

Expand Down Expand Up @@ -152,6 +157,9 @@ class FakeTestArbitrator : public MemoryArbitrator {
std::string kind() const override {
return "FAKE";
}

private:
const bool injectAddPoolFailure_{false};
};
} // namespace

Expand All @@ -173,6 +181,22 @@ TEST_F(MemoryManagerTest, createWithCustomArbitrator) {
ASSERT_EQ(manager.allocator()->capacity(), options.allocatorCapacity);
}

TEST_F(MemoryManagerTest, addPoolFailure) {
const std::string kindString = "FAKE";
MemoryArbitrator::Factory factory =
[](const MemoryArbitrator::Config& config) {
return std::make_unique<FakeTestArbitrator>(
config, /*injectAddPoolFailure*/ true);
};
MemoryArbitrator::registerFactory(kindString, factory);
auto guard = folly::makeGuard(
[&] { MemoryArbitrator::unregisterFactory(kindString); });
MemoryManagerOptions options;
options.arbitratorKind = kindString;
MemoryManager manager{options};
VELOX_ASSERT_THROW(manager.addRootPool(), "Failed to add pool");
}

TEST_F(MemoryManagerTest, addPool) {
MemoryManager manager{};

Expand Down
20 changes: 13 additions & 7 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,21 @@ TEST_P(MemoryPoolTest, ctor) {
ASSERT_EQ(root->parent(), nullptr);
ASSERT_EQ(root->root(), root.get());
ASSERT_EQ(root->capacity(), capacity);
VELOX_ASSERT_THROW(
static_cast<MemoryPoolImpl*>(root.get())
->setDestructionCallback([](MemoryPool*) {}),
"");

{
auto fakeRoot = std::make_shared<MemoryPoolImpl>(
&manager,
"fake_root",
MemoryPool::Kind::kAggregate,
nullptr,
nullptr,
nullptr);
&manager, "fake_root", MemoryPool::Kind::kAggregate, nullptr, nullptr);
// We can't construct an aggregate memory pool with non-thread safe.
ASSERT_ANY_THROW(std::make_shared<MemoryPoolImpl>(
&manager,
"fake_root",
MemoryPool::Kind::kAggregate,
nullptr,
nullptr,
nullptr,
MemoryPool::Options{.threadSafe = false}));
ASSERT_EQ("fake_root", fakeRoot->name());
ASSERT_EQ(
Expand All @@ -182,6 +180,10 @@ TEST_P(MemoryPoolTest, ctor) {
ASSERT_EQ(child->parent(), root.get());
ASSERT_EQ(child->root(), root.get());
ASSERT_EQ(child->capacity(), capacity);
VELOX_ASSERT_THROW(
static_cast<MemoryPoolImpl*>(child.get())
->setDestructionCallback([](MemoryPool*) {}),
"");
auto& favoriteChild = dynamic_cast<MemoryPoolImpl&>(*child);
ASSERT_EQ("child", favoriteChild.name());
ASSERT_EQ(
Expand All @@ -194,6 +196,10 @@ TEST_P(MemoryPoolTest, ctor) {
ASSERT_EQ(aggregateChild->parent(), root.get());
ASSERT_EQ(aggregateChild->root(), root.get());
ASSERT_EQ(aggregateChild->capacity(), capacity);
VELOX_ASSERT_THROW(
static_cast<MemoryPoolImpl*>(aggregateChild.get())
->setDestructionCallback([](MemoryPool*) {}),
"");
auto grandChild = aggregateChild->addLeafChild("child", isLeafThreadSafe_);
ASSERT_EQ(grandChild->parent(), aggregateChild.get());
ASSERT_EQ(grandChild->root(), root.get());
Expand Down

0 comments on commit 9851a4e

Please sign in to comment.