From 97fd17483fe2efebd64a1a57dfe40aa16a46f625 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 21 Apr 2016 10:11:54 +0800 Subject: [PATCH] fix thread safe --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- .../org/apache/spark/util/collection/ExternalSorter.scala | 8 ++++---- .../org/apache/spark/util/collection/Spillable.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index a78bf4241f6f2..601a765b014fa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -81,7 +81,7 @@ class ExternalAppendOnlyMap[K, V, C]( this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get()) } - private var currentMap = new SizeTrackingAppendOnlyMap[K, C] + @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager @@ -115,7 +115,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - private var readingIterator: SpillableIterator = null + @volatile private var readingIterator: SpillableIterator = null /** * Number of files this map has spilled so far. diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 8bc0861ef9fee..501ff58155c4a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -124,8 +124,8 @@ private[spark] class ExternalSorter[K, V, C]( // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. - private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = new PartitionedPairBuffer[K, C] + @volatile private var map = new PartitionedAppendOnlyMap[K, C] + @volatile private var buffer = new PartitionedPairBuffer[K, C] // Total spilling statistics private var _diskBytesSpilled = 0L @@ -135,9 +135,9 @@ private[spark] class ExternalSorter[K, V, C]( private var _peakMemoryUsedBytes: Long = 0L def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes - private var isShuffleSort: Boolean = true + @volatile private var isShuffleSort: Boolean = true private val forceSpillFiles = new ArrayBuffer[SpilledFile] - private var readingIterator: SpillableIterator = null + @volatile private var readingIterator: SpillableIterator = null // A comparator for keys K that orders them within a partition to allow aggregation or sorting. // Can be a partial ordering by hash code if a total ordering is not provided through by the diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index e71689a4eca9a..27b40e016da09 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -58,7 +58,7 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue) // Threshold for this collection's size in bytes before we start tracking its memory usage - // To avoid memory leak for rdd.first(), initialize this to a value orders of magnitude > 0 + // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 private[this] var myMemoryThreshold = initialMemoryThreshold // Number of elements read from input since last spill