Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21090][core]Optimize the unified memory manager code #18296

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
maxOffHeapStorageMemory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug in the old code. Good catch.

}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
Expand All @@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] (
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense: if we hit this branch then we've failed to acquire enough memory to store a block. In particular, we might need M bytes of memory but only A are available in storage. In this case, we should only request the (M - A) bytes of extra memory that we need, but the old code was incorrectly requesting all M which would cause us to over-evict. Therefore, I think this change is correct.

executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
evictBlocksToFreeSpaceCalled.set(numBytesToFree)
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
numBytesToFree
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
mm.invokePrivate[Unit](assertInvariants())
}

test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
.set("spark.memory.offHeap.size", "1000")
.set("spark.testing.memory", "1000")
.set("spark.memory.offHeap.enabled", "true")
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
val memoryMode = MemoryMode.OFF_HEAP

assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 400L)

// Fail fast
assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
assert(mm.storageMemoryUsed === 0L)

assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assert(mm.storageMemoryUsed === 100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)

// Borrow 50 from execution memory
assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 550L)

// Borrow 50 from execution memory and evict 50 to free space
assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 50)
assert(mm.storageMemoryUsed === 600L)
}
}