Skip to content

Commit

Permalink
init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed May 10, 2016
1 parent 1678bff commit 6724482
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 46 deletions.
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
/**
* Returns the size of used memory in bytes.
*/
long getUsed() {
protected long getUsed() {
return used;
}

Expand Down Expand Up @@ -130,4 +130,21 @@ protected void freePage(MemoryBlock page) {
used -= page.size();
taskMemoryManager.freePage(page, this);
}

/**
* Allocates a heap memory of `size`.
*/
public long acquireOnHeapMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
used += granted;
return granted;
}

/**
* Release N bytes of heap memory.
*/
public void freeOnHeapMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
used -= size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,11 @@ public long cleanUpAllAllocatedMemory() {
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}

/**
* Returns Tungsten memory mode
*/
public MemoryMode getTungstenMemoryMode(){
return tungstenMemoryMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ class ExternalAppendOnlyMap[K, V, C](
serializer: Serializer = SparkEnv.get.serializer,
blockManager: BlockManager = SparkEnv.get.blockManager,
context: TaskContext = TaskContext.get())
extends Iterable[(K, C)]
extends Spillable[SizeTracker](context.taskMemoryManager())
with Serializable
with Logging
with Spillable[SizeTracker] {
with Iterable[(K, C)] {

if (context == null) {
throw new IllegalStateException(
Expand All @@ -79,9 +79,7 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}

override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
Expand Down Expand Up @@ -115,6 +113,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

@volatile private var readingIterator: SpillableIterator = null

/**
* Number of files this map has spilled so far.
* Exposed for testing.
Expand Down Expand Up @@ -180,6 +180,29 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
spilledMaps.append(diskMapIterator)
}

/**
* Force to spilling the current in-memory collection to disk to release memory,
* It will be called by TaskMemoryManager when there is not enough memory for the task.
*/
override protected[this] def forceSpill(): Boolean = {
assert(readingIterator != null)
val isSpilled = readingIterator.spill()
if (isSpilled) {
currentMap = null
}
isSpilled
}

/**
* Spill the in-memory Iterator to a temporary file on disk.
*/
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
: DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
Expand All @@ -200,9 +223,8 @@ class ExternalAppendOnlyMap[K, V, C](

var success = false
try {
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
val kv = it.next()
while (inMemoryIterator.hasNext) {
val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1

Expand Down Expand Up @@ -235,7 +257,17 @@ class ExternalAppendOnlyMap[K, V, C](
}
}

spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
new DiskMapIterator(file, blockId, batchSizes)
}

/**
* Returns a destructive iterator for iterating over the entries of this map.
* If this iterator is forced spill to disk to release memory when there is not enough memory,
* it returns pairs from an on-disk map.
*/
def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator
}

/**
Expand All @@ -248,15 +280,18 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}

private def freeCurrentMap(): Unit = {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
if (currentMap != null) {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
}
}

/**
Expand All @@ -270,8 +305,8 @@ class ExternalAppendOnlyMap[K, V, C](

// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
Expand Down Expand Up @@ -530,8 +565,56 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}

private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
extends Iterator[(K, C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = SPILL_LOCK.synchronized {
if (hasSpilled) {
false
} else {
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
nextUpstream = spillMemoryIteratorToDisk(upstream)
hasSpilled = true
true
}
}

def readNext(): (K, C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
if (upstream.hasNext) {
upstream.next()
} else {
null
}
}

override def hasNext(): Boolean = cur != null

override def next(): (K, C) = {
val r = cur
cur = readNext()
r
}
}

/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)

override def toString(): String = {
this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
}
}

private[spark] object ExternalAppendOnlyMap {
Expand Down
Loading

0 comments on commit 6724482

Please sign in to comment.