From 7ea727470735cb2a420bd5411af0202d264d9ec7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 21 Apr 2016 10:52:51 +0800 Subject: [PATCH] fix SpillableIterator --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 6 ++++-- .../org/apache/spark/util/collection/ExternalSorter.scala | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 601a765b014fa..fc71f8365cd18 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -570,13 +570,15 @@ class ExternalAppendOnlyMap[K, V, C]( private[this] class SpillableIterator(var upstream: Iterator[(K, C)]) extends Iterator[(K, C)] { + private val SPILL_LOCK = new Object() + private var nextUpstream: Iterator[(K, C)] = null private var cur: (K, C) = readNext() private var hasSpilled: Boolean = false - def spill(): Boolean = synchronized { + def spill(): Boolean = SPILL_LOCK.synchronized { if (hasSpilled) { false } else { @@ -588,7 +590,7 @@ class ExternalAppendOnlyMap[K, V, C]( } } - def readNext(): (K, C) = synchronized { + def readNext(): (K, C) = SPILL_LOCK.synchronized { if (nextUpstream != null) { upstream = nextUpstream nextUpstream = null diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 501ff58155c4a..4067acee738ed 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -783,13 +783,15 @@ private[spark] class ExternalSorter[K, V, C]( private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { + private val SPILL_LOCK = new Object() + private var nextUpstream: Iterator[((Int, K), C)] = null private var cur: ((Int, K), C) = readNext() private var hasSpilled: Boolean = false - def spill(): Boolean = synchronized { + def spill(): Boolean = SPILL_LOCK.synchronized { if (hasSpilled) { false } else { @@ -819,7 +821,7 @@ private[spark] class ExternalSorter[K, V, C]( } } - def readNext(): ((Int, K), C) = synchronized { + def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized { if (nextUpstream != null) { upstream = nextUpstream nextUpstream = null