diff --git a/velox/exec/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp index 859fe284933e..34f3800de191 100644 --- a/velox/exec/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -205,7 +205,10 @@ bool SharedArbitrator::growMemory( << " capacity to " << succinctBytes(requestor->capacity() + targetBytes) << " which exceeds its max capacity " - << succinctBytes(requestor->maxCapacity()); + << succinctBytes(requestor->maxCapacity()) + << ", current capacity " + << succinctBytes(requestor->capacity()) << ", request " + << succinctBytes(targetBytes); return false; } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 9ec1f5ec1577..e0d0a2df40f1 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2590,7 +2590,15 @@ uint64_t Task::MemoryReclaimer::reclaim( if (task->isCancelled()) { return 0; } - return memory::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats); + // Before reclaiming from its operators, first to check if there is any free + // capacity in the root after stopping this task. + const uint64_t shrunkBytes = pool->shrink(targetBytes); + if (shrunkBytes >= targetBytes) { + return shrunkBytes; + } + return shrunkBytes + + memory::MemoryReclaimer::reclaim( + pool, targetBytes - shrunkBytes, maxWaitMs, stats); } void Task::MemoryReclaimer::abort( diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 1fbfb687b6c4..cda061f3db9c 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -3098,13 +3098,15 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyInput) { } auto* driver = values->testingOperatorCtx()->driver(); auto task = values->testingOperatorCtx()->task(); + // Shrink all the capacity before reclaim. + task->pool()->shrink(); { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_GT(stats.reclaimedBytes, 0); + ASSERT_EQ(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } static_cast(task->pool()) @@ -3168,13 +3170,15 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { } auto* driver = op->testingOperatorCtx()->driver(); auto task = op->testingOperatorCtx()->task(); + // Shrink all the capacity before reclaim. + task->pool()->shrink(); { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_GT(stats.reclaimedBytes, 0); + ASSERT_EQ(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } // Sets back the memory capacity to proceed the test. diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 059356037c90..8d8ac92e2a67 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -639,6 +639,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "false") .config(core::QueryConfig::kWriterSpillEnabled, "true") // Set 0 file writer flush threshold to always trigger flush in // test. @@ -654,6 +655,11 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { connector::hive::HiveConfig:: kOrcWriterMaxDictionaryMemorySession, "1GB") + .connectorSessionProperty( + kHiveConnectorId, + connector::hive::HiveConfig:: + kOrcWriterMaxDictionaryMemorySession, + "1GB") .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool(), result.task); @@ -2745,6 +2751,52 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, tableWriteReclaimOnClose) { waitForAllTasksToBeDeleted(); } +DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenWriterCloseAndTaskReclaim) { + const uint64_t memoryCapacity = 512 * MB; + setupMemory(memoryCapacity); + std::vector vectors = newVectors(1'000, memoryCapacity / 8); + const auto expectedResult = runWriteTask(vectors, nullptr, 1, false).data; + + std::shared_ptr queryCtx = newQueryCtx(memoryCapacity); + + std::atomic_bool writerCloseWaitFlag{true}; + folly::EventCount writerCloseWait; + std::atomic_bool taskReclaimWaitFlag{true}; + folly::EventCount taskReclaimWait; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::flushStripe", + std::function(([&](dwrf::Writer* writer) { + writerCloseWaitFlag = false; + writerCloseWait.notifyAll(); + taskReclaimWait.await([&]() { return !taskReclaimWaitFlag.load(); }); + }))); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Task::requestPauseLocked", + std::function(([&](Task* /*unused*/) { + taskReclaimWaitFlag = false; + taskReclaimWait.notifyAll(); + }))); + + std::thread queryThread([&]() { + const auto result = + runWriteTask(vectors, queryCtx, 1, true, expectedResult); + }); + + writerCloseWait.await([&]() { return !writerCloseWaitFlag.load(); }); + + // Creates a fake pool to trigger memory arbitration. + auto fakePool = queryCtx->pool()->addLeafChild( + "fakePool", true, FakeMemoryReclaimer::create()); + ASSERT_TRUE(memoryManager_->growPool( + fakePool.get(), + arbitrator_->stats().freeCapacityBytes + + queryCtx->pool()->capacity() / 2)); + + queryThread.join(); + waitForAllTasksToBeDeleted(); +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, tableFileWriteError) { const uint64_t memoryCapacity = 32 * MB; setupMemory(memoryCapacity); @@ -3626,6 +3678,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenRaclaimAndJoinFinish) { // spill after hash table built. memory::MemoryReclaimer::Stats stats; const uint64_t oldCapacity = joinQueryCtx->pool()->capacity(); + task.load()->pool()->shrink(); task.load()->pool()->reclaim(1'000, 0, stats); // If the last build memory pool is first child of its parent memory pool, // then memory arbitration (or join node memory pool) will reclaim from the