From eacc788346ccae232bd530dd880f801475a49734 Mon Sep 17 00:00:00 2001 From: Kenichi Maehashi Date: Wed, 19 Nov 2014 12:11:09 -0800 Subject: [PATCH 1/4] [SPARK-4470] Validate number of threads in local mode When running Spark locally, if number of threads is specified as 0 (e.g., `spark-submit --master local[0] ...`), the job got stuck and does not run at all. I think it's better to validate the parameter. Fix for [SPARK-4470](https://issues.apache.org/jira/browse/SPARK-4470). Author: Kenichi Maehashi Closes #3337 from kmaehashi/spark-4470 and squashes the following commits: 3ad76f3 [Kenichi Maehashi] fix code style 7716734 [Kenichi Maehashi] SPARK-4470: Validate number of threads in local mode --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 37013121c572a..ae8bbfb56f493 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1813,6 +1813,9 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt + if (threadCount <= 0) { + throw new SparkException(s"Asked to run locally with $threadCount threads") + } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) From 22fc4e751c0a2f0ff39e42aa0a8fb9459d7412ec Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 19 Nov 2014 13:06:48 -0800 Subject: [PATCH 2/4] [SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration. Author: Tathagata Das Closes #3358 from tdas/SPARK-4482 and squashes the following commits: b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker --- .../scheduler/ReceivedBlockTracker.scala | 37 +++++++++----- .../streaming/ReceivedBlockTrackerSuite.scala | 50 +++++++++++++------ 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5f5e1909908d5..02758e0bca6c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker( private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue] private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] - - private val logManagerRollingIntervalSecs = conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) - private val logManagerOption = checkpointDirOption.map { checkpointDir => - new WriteAheadLogManager( - ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir), - hadoopConf, - rollingIntervalSecs = logManagerRollingIntervalSecs, - callerName = "ReceivedBlockHandlerMaster", - clock = clock - ) - } + private val logManagerOption = createLogManager() private var lastAllocatedBatchTime: Time = null @@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker( private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) } + + /** Optionally create the write ahead log manager only if the feature is enabled */ + private def createLogManager(): Option[WriteAheadLogManager] = { + if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (checkpointDirOption.isEmpty) { + throw new SparkException( + "Cannot enable receiver write-ahead log without checkpoint directory set. " + + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + + "See documentation for more details.") + } + val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get) + val rollingIntervalSecs = conf.getInt( + "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) + val logManager = new WriteAheadLogManager(logDir, hadoopConf, + rollingIntervalSecs = rollingIntervalSecs, clock = clock, + callerName = "ReceivedBlockHandlerMaster") + Some(logManager) + } else { + None + } + } + + /** Check if the log manager is enabled. This is only used for testing purposes. */ + private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty } private[streaming] object ReceivedBlockTracker { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fd9c97f551c62..01a09b67b99dc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -41,17 +41,16 @@ import org.apache.spark.util.Utils class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") - conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") - val hadoopConf = new Configuration() val akkaTimeout = 10 seconds val streamId = 1 var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]() var checkpointDirectory: File = null + var conf: SparkConf = null before { + conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") checkpointDirectory = Files.createTempDir() } @@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite } test("block addition, and block to batch allocation") { - val receivedBlockTracker = createTracker(enableCheckpoint = false) + val receivedBlockTracker = createTracker(setCheckpointDir = false) + receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty val blockInfos = generateBlockInfos() @@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite test("block addition, block to batch allocation and cleanup with write ahead log") { val manualClock = new ManualClock - conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1) - // Set the time increment level to twice the rotation interval so that every increment creates // a new log file - val timeIncrementMillis = 2000L + def incrementTime() { + val timeIncrementMillis = 2000L manualClock.addToTime(timeIncrementMillis) } @@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite } // Start tracker and add blocks - val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock) + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") + val tracker1 = createTracker(clock = manualClock) + tracker1.isLogManagerEnabled should be (true) + val blockInfos1 = addBlockInfos(tracker1) tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 @@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered list of unallocated blocks incrementTime() - val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker2 = createTracker(clock = manualClock) tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated @@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state incrementTime() - val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker3 = createTracker(clock = manualClock) tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 tracker3.getUnallocatedBlocks(streamId) shouldBe empty @@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch incrementTime() - val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker4 = createTracker(clock = manualClock) tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } + + test("enabling write ahead log but not setting checkpoint dir") { + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + intercept[SparkException] { + createTracker(setCheckpointDir = false) + } + } + + test("setting checkpoint dir but not enabling write ahead log") { + // When WAL config is not set, log manager should not be enabled + val tracker1 = createTracker(setCheckpointDir = true) + tracker1.isLogManagerEnabled should be (false) + + // When WAL is explicitly disabled, log manager should not be enabled + conf.set("spark.streaming.receiver.writeAheadLog.enable", "false") + val tracker2 = createTracker(setCheckpointDir = true) + tracker2.isLogManagerEnabled should be(false) + } /** * Create tracker object with the optional provided clock. Use fake clock if you * want to control time by manually incrementing it to test log cleanup. */ - def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = { - val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None + def createTracker( + setCheckpointDir: Boolean = true, + clock: Clock = new SystemClock): ReceivedBlockTracker = { + val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption) allReceivedBlockTrackers += tracker tracker From 3bf7ceebb10741a8b32e0c00f0edfd3a222ec5cd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 19 Nov 2014 13:17:15 -0800 Subject: [PATCH 3/4] [SPARK-4481][Streaming][Doc] Fix the wrong description of updateFunc Removed `If `this` function returns None, then corresponding state key-value pair will be eliminated.` for the description of `updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]` Author: zsxwing Closes #3356 from zsxwing/SPARK-4481 and squashes the following commits: 76a9891 [zsxwing] Add a note that keys may be added or removed 0ebc42a [zsxwing] Fix the wrong description of updateFunc --- .../streaming/dstream/PairDStreamFunctions.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index b39f47f04a38b..3f03f42270252 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -398,10 +398,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream @@ -442,11 +441,10 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. From 0df02ca463a4126e5437b37114c6759a57ab71ee Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 19 Nov 2014 14:03:44 -0800 Subject: [PATCH 4/4] [HOT FIX] MiMa tests are broken This is blocking #3353 and other patches. Author: Andrew Or Closes #3371 from andrewor14/mima-hot-fix and squashes the following commits: 842d059 [Andrew Or] Move excludes to the right section c4d4f4e [Andrew Or] MIMA hot fix --- project/MimaExcludes.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6ab2d1b04cc87..94de14ddbd2bb 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,12 @@ object MimaExcludes { // the maven-generated artifacts in the 1.2 build. MimaBuild.excludeSparkPackage("unused"), ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional") + ) ++ Seq( + // SPARK-2321 + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkStageInfoImpl.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkStageInfo.submissionTime") ) case v if v.startsWith("1.2") =>