Skip to content

Commit

Permalink
fix SpillableIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Apr 21, 2016
1 parent e009d95 commit 7ea7274
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,15 @@ class ExternalAppendOnlyMap[K, V, C](
private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
extends Iterator[(K, C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = synchronized {
def spill(): Boolean = SPILL_LOCK.synchronized {
if (hasSpilled) {
false
} else {
Expand All @@ -588,7 +590,7 @@ class ExternalAppendOnlyMap[K, V, C](
}
}

def readNext(): (K, C) = synchronized {
def readNext(): (K, C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,15 @@ private[spark] class ExternalSorter[K, V, C](
private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)])
extends Iterator[((Int, K), C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[((Int, K), C)] = null

private var cur: ((Int, K), C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = synchronized {
def spill(): Boolean = SPILL_LOCK.synchronized {
if (hasSpilled) {
false
} else {
Expand Down Expand Up @@ -819,7 +821,7 @@ private[spark] class ExternalSorter[K, V, C](
}
}

def readNext(): ((Int, K), C) = synchronized {
def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
Expand Down

0 comments on commit 7ea7274

Please sign in to comment.