From fa6de06c826084ce5d938c71b7450c59204c864d Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Mon, 18 Dec 2023 18:36:52 -0800 Subject: [PATCH] Fix the race between memory release and task reclaim (#8086) Summary: There is race between memory release and task reclaim that cause a unnecessary local OOM in Meta internal shadowing test: T1. memory arbitration triggers from a query T2. memory arbitrator find the memory grow request exceeds its the query memory capacity limit so starts to reclaim from this query T3. one or more operators from this query releases a large chunk of memory which has sufficient space to satisfy the memory arbitration request T4. the memory arbitration wait for the task to complete and then reclaim from each of its operator T5. however all the operators have no memory to reclaim (T3 has freed all the reclaimable memory) T6. memory arbitrator found nothing has been reclaimed from this query and fail the memory arbitration request. However, there is free memory after T3. This PR fixes this issue by shrink the query memory pool after the task reclaim pauses the task. Verify with table writer which is the case found in Meta shadowing test. Also improve the memory arbitration logging a bit. Pull Request resolved: https://github.com/facebookincubator/velox/pull/8086 Reviewed By: mbasmanova Differential Revision: D52244993 Pulled By: xiaoxmeng fbshipit-source-id: 846a02ba0580eeff89f2a6b6ffdba82a307b51f0 --- velox/exec/SharedArbitrator.cpp | 5 ++- velox/exec/Task.cpp | 10 ++++- velox/exec/tests/AggregationTest.cpp | 8 +++- velox/exec/tests/SharedArbitratorTest.cpp | 53 +++++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/velox/exec/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp index 859fe284933e..34f3800de191 100644 --- a/velox/exec/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -205,7 +205,10 @@ bool SharedArbitrator::growMemory( << " capacity to " << succinctBytes(requestor->capacity() + targetBytes) << " which exceeds its max capacity " - << succinctBytes(requestor->maxCapacity()); + << succinctBytes(requestor->maxCapacity()) + << ", current capacity " + << succinctBytes(requestor->capacity()) << ", request " + << succinctBytes(targetBytes); return false; } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 9ec1f5ec1577..e0d0a2df40f1 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2590,7 +2590,15 @@ uint64_t Task::MemoryReclaimer::reclaim( if (task->isCancelled()) { return 0; } - return memory::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats); + // Before reclaiming from its operators, first to check if there is any free + // capacity in the root after stopping this task. + const uint64_t shrunkBytes = pool->shrink(targetBytes); + if (shrunkBytes >= targetBytes) { + return shrunkBytes; + } + return shrunkBytes + + memory::MemoryReclaimer::reclaim( + pool, targetBytes - shrunkBytes, maxWaitMs, stats); } void Task::MemoryReclaimer::abort( diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 1fbfb687b6c4..cda061f3db9c 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -3098,13 +3098,15 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyInput) { } auto* driver = values->testingOperatorCtx()->driver(); auto task = values->testingOperatorCtx()->task(); + // Shrink all the capacity before reclaim. + task->pool()->shrink(); { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_GT(stats.reclaimedBytes, 0); + ASSERT_EQ(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } static_cast(task->pool()) @@ -3168,13 +3170,15 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { } auto* driver = op->testingOperatorCtx()->driver(); auto task = op->testingOperatorCtx()->task(); + // Shrink all the capacity before reclaim. + task->pool()->shrink(); { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_GT(stats.reclaimedBytes, 0); + ASSERT_EQ(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } // Sets back the memory capacity to proceed the test. diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 059356037c90..8d8ac92e2a67 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -639,6 +639,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "false") .config(core::QueryConfig::kWriterSpillEnabled, "true") // Set 0 file writer flush threshold to always trigger flush in // test. @@ -654,6 +655,11 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { connector::hive::HiveConfig:: kOrcWriterMaxDictionaryMemorySession, "1GB") + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig:: + kOrcWriterMaxDictionaryMemorySession, + "1GB") .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool(), result.task); @@ -2745,6 +2751,52 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, tableWriteReclaimOnClose) { waitForAllTasksToBeDeleted(); } +DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenWriterCloseAndTaskReclaim) { + const uint64_t memoryCapacity = 512 * MB; + setupMemory(memoryCapacity); + std::vector vectors = newVectors(1'000, memoryCapacity / 8); + const auto expectedResult = runWriteTask(vectors, nullptr, 1, false).data; + + std::shared_ptr queryCtx = newQueryCtx(memoryCapacity); + + std::atomic_bool writerCloseWaitFlag{true}; + folly::EventCount writerCloseWait; + std::atomic_bool taskReclaimWaitFlag{true}; + folly::EventCount taskReclaimWait; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::flushStripe", + std::function(([&](dwrf::Writer* writer) { + writerCloseWaitFlag = false; + writerCloseWait.notifyAll(); + taskReclaimWait.await([&]() { return !taskReclaimWaitFlag.load(); }); + }))); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Task::requestPauseLocked", + std::function(([&](Task* /*unused*/) { + taskReclaimWaitFlag = false; + taskReclaimWait.notifyAll(); + }))); + + std::thread queryThread([&]() { + const auto result = + runWriteTask(vectors, queryCtx, 1, true, expectedResult); + }); + + writerCloseWait.await([&]() { return !writerCloseWaitFlag.load(); }); + + // Creates a fake pool to trigger memory arbitration. + auto fakePool = queryCtx->pool()->addLeafChild( + "fakePool", true, FakeMemoryReclaimer::create()); + ASSERT_TRUE(memoryManager_->growPool( + fakePool.get(), + arbitrator_->stats().freeCapacityBytes + + queryCtx->pool()->capacity() / 2)); + + queryThread.join(); + waitForAllTasksToBeDeleted(); +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, tableFileWriteError) { const uint64_t memoryCapacity = 32 * MB; setupMemory(memoryCapacity); @@ -3626,6 +3678,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenRaclaimAndJoinFinish) { // spill after hash table built. memory::MemoryReclaimer::Stats stats; const uint64_t oldCapacity = joinQueryCtx->pool()->capacity(); + task.load()->pool()->shrink(); task.load()->pool()->reclaim(1'000, 0, stats); // If the last build memory pool is first child of its parent memory pool, // then memory arbitration (or join node memory pool) will reclaim from the