Skip to content

Commit

Permalink
fix Mima & minor bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Apr 19, 2016
1 parent 70bcffa commit b84ad96
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,32 +572,41 @@ class ExternalAppendOnlyMap[K, V, C](

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = null
private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = synchronized {
if (upstream == null || nextUpstream != null) {
if (hasSpilled) {
false
} else {
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
nextUpstream = spillMemoryIteratorToDisk(upstream)
hasSpilled = true
true
}
}

override def hasNext: Boolean = synchronized {
def readNext(): (K, C) = synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
val r = upstream.hasNext
if (r) {
cur = upstream.next()
if (upstream.hasNext) {
upstream.next()
} else {
null
}
r
}

override def next(): (K, C) = cur
override def hasNext(): Boolean = cur != null

override def next(): (K, C) = {
val r = cur
cur = readNext()
r
}
}

/** Convenience function to hash the given (K, C) pair by the key. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,12 @@ private[spark] class ExternalSorter[K, V, C](

private var nextUpstream: Iterator[((Int, K), C)] = null

private var cur: ((Int, K), C) = null
private var cur: ((Int, K), C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = synchronized {
if (upstream == null || nextUpstream != null) {
if (hasSpilled) {
false
} else {
val inMemoryIterator = new WritablePartitionedIterator {
Expand All @@ -812,22 +814,29 @@ private[spark] class ExternalSorter[K, V, C](
val iterator = spillReader.readNextPartition()
iterator.map(cur => ((p, cur._1), cur._2))
}
hasSpilled = true
true
}
}

override def hasNext: Boolean = synchronized {
def readNext(): ((Int, K), C) = synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
val r = upstream.hasNext
if (r) {
cur = upstream.next()
if (upstream.hasNext) {
upstream.next()
} else {
null
}
r
}

override def next(): ((Int, K), C) = cur
override def hasNext(): Boolean = cur != null

override def next(): ((Int, K), C) = {
val r = cur
cur = readNext()
r
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,10 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
.set("spark.testing.memory", "100000000")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e6.toInt
val N = 2e5.toInt
sc.parallelize(1 to N, 10)
.map { i => (i, i) }
.groupByKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,14 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("force to spill for sorting") {
test("force to spill for external sorter") {
val conf = createSparkConf(loadDefaults = false, kryo = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
.set("spark.testing.memory", "100000000")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e6.toInt
val N = 2e5.toInt
val p = new org.apache.spark.HashPartitioner(10)
val p2 = new org.apache.spark.HashPartitioner(5)
sc.parallelize(1 to N, 10)
Expand Down
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,11 @@ object MimaExcludes {
// [SPARK-14628] Simplify task metrics by always tracking read/write metrics
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod")
) ++ Seq(
// [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.ExternalAppendOnlyMap"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.ExternalSorter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.Spillable")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down

0 comments on commit b84ad96

Please sign in to comment.