Skip to content

Commit

Permalink
[SPARK-4467] fix elements read count for ExtrenalSorter
Browse files Browse the repository at this point in the history
the elementsRead variable should be reset to 0 after each spilling

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #3302 from tsdeng/fix_external_sorter_record_count and squashes the following commits:

7b56ca0 [Tianshuo Deng] fix method signature
782c7de [Tianshuo Deng] make elementsRead private, fix comment
bb7ff28 [Tianshuo Deng] update elemetsRead through addElementsRead method
74ca246 [Tianshuo Deng] fix elements read count
  • Loading branch information
tsdeng authored and Andrew Or committed Nov 19, 2014
1 parent 5f5ac2d commit d75579d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ class ExternalAppendOnlyMap[K, V, C](
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager

// Number of pairs inserted since last spill; note that we count them even if a value is merged
// with a previous key in case we're doing something like groupBy where the result grows
protected[this] var elementsRead = 0L

/**
* Size of object batches when reading/writing from serializers.
*
Expand Down Expand Up @@ -132,7 +128,7 @@ class ExternalAppendOnlyMap[K, V, C](
currentMap = new SizeTrackingAppendOnlyMap[K, C]
}
currentMap.changeValue(curEntry._1, update)
elementsRead += 1
addElementsRead()
}
}

Expand Down Expand Up @@ -209,8 +205,6 @@ class ExternalAppendOnlyMap[K, V, C](
}

spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

elementsRead = 0
}

def diskBytesSpilled: Long = _diskBytesSpilled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ private[spark] class ExternalSorter[K, V, C](
private var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
private var buffer = new SizeTrackingPairBuffer[(Int, K), C]

// Number of pairs read from input since last spill; note that we count them even if a value is
// merged with a previous key in case we're doing something like groupBy where the result grows
protected[this] var elementsRead = 0L

// Total spilling statistics
private var _diskBytesSpilled = 0L

Expand Down Expand Up @@ -204,15 +200,15 @@ private[spark] class ExternalSorter[K, V, C](
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
elementsRead += 1
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
elementsRead += 1
addElementsRead()
val kv = records.next()
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ private[spark] trait Spillable[C] {
protected def spill(collection: C): Unit

// Number of elements read from input since last spill
protected var elementsRead: Long
protected def elementsRead: Long = _elementsRead

// Called by subclasses every time a record is read
// It's used for checking spilling frequency
protected def addElementsRead(): Unit = { _elementsRead += 1 }

// Memory manager that can be used to acquire/release memory
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager

// What threshold of elementsRead we start estimating collection size at
private[this] val trackMemoryThreshold = 1000

// Number of elements read from input since last spill
private[this] var _elementsRead = 0L

// How much of the shared memory pool this collection has claimed
private[this] var myMemoryThreshold = 0L

Expand Down Expand Up @@ -76,6 +83,7 @@ private[spark] trait Spillable[C] {

spill(collection)

_elementsRead = 0
// Keep track of spills, and release memory
_memoryBytesSpilled += currentMemory
releaseMemoryForThisThread()
Expand Down

0 comments on commit d75579d

Please sign in to comment.