diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 37013121c572a..ae8bbfb56f493 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1813,6 +1813,9 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt + if (threadCount <= 0) { + throw new SparkException(s"Asked to run locally with $threadCount threads") + } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6ab2d1b04cc87..94de14ddbd2bb 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,12 @@ object MimaExcludes { // the maven-generated artifacts in the 1.2 build. MimaBuild.excludeSparkPackage("unused"), ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional") + ) ++ Seq( + // SPARK-2321 + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkStageInfoImpl.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.SparkStageInfo.submissionTime") ) case v if v.startsWith("1.2") => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index b39f47f04a38b..3f03f42270252 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -398,10 +398,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream @@ -442,11 +441,10 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5f5e1909908d5..02758e0bca6c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker( private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue] private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] - - private val logManagerRollingIntervalSecs = conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) - private val logManagerOption = checkpointDirOption.map { checkpointDir => - new WriteAheadLogManager( - ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir), - hadoopConf, - rollingIntervalSecs = logManagerRollingIntervalSecs, - callerName = "ReceivedBlockHandlerMaster", - clock = clock - ) - } + private val logManagerOption = createLogManager() private var lastAllocatedBatchTime: Time = null @@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker( private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) } + + /** Optionally create the write ahead log manager only if the feature is enabled */ + private def createLogManager(): Option[WriteAheadLogManager] = { + if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (checkpointDirOption.isEmpty) { + throw new SparkException( + "Cannot enable receiver write-ahead log without checkpoint directory set. " + + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + + "See documentation for more details.") + } + val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get) + val rollingIntervalSecs = conf.getInt( + "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60) + val logManager = new WriteAheadLogManager(logDir, hadoopConf, + rollingIntervalSecs = rollingIntervalSecs, clock = clock, + callerName = "ReceivedBlockHandlerMaster") + Some(logManager) + } else { + None + } + } + + /** Check if the log manager is enabled. This is only used for testing purposes. */ + private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty } private[streaming] object ReceivedBlockTracker { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fd9c97f551c62..01a09b67b99dc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -41,17 +41,16 @@ import org.apache.spark.util.Utils class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") - conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") - val hadoopConf = new Configuration() val akkaTimeout = 10 seconds val streamId = 1 var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]() var checkpointDirectory: File = null + var conf: SparkConf = null before { + conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") checkpointDirectory = Files.createTempDir() } @@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite } test("block addition, and block to batch allocation") { - val receivedBlockTracker = createTracker(enableCheckpoint = false) + val receivedBlockTracker = createTracker(setCheckpointDir = false) + receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty val blockInfos = generateBlockInfos() @@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite test("block addition, block to batch allocation and cleanup with write ahead log") { val manualClock = new ManualClock - conf.getInt( - "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1) - // Set the time increment level to twice the rotation interval so that every increment creates // a new log file - val timeIncrementMillis = 2000L + def incrementTime() { + val timeIncrementMillis = 2000L manualClock.addToTime(timeIncrementMillis) } @@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite } // Start tracker and add blocks - val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock) + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1") + val tracker1 = createTracker(clock = manualClock) + tracker1.isLogManagerEnabled should be (true) + val blockInfos1 = addBlockInfos(tracker1) tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 @@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered list of unallocated blocks incrementTime() - val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker2 = createTracker(clock = manualClock) tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated @@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state incrementTime() - val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker3 = createTracker(clock = manualClock) tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 tracker3.getUnallocatedBlocks(streamId) shouldBe empty @@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch incrementTime() - val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock) + val tracker4 = createTracker(clock = manualClock) tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } + + test("enabling write ahead log but not setting checkpoint dir") { + conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") + intercept[SparkException] { + createTracker(setCheckpointDir = false) + } + } + + test("setting checkpoint dir but not enabling write ahead log") { + // When WAL config is not set, log manager should not be enabled + val tracker1 = createTracker(setCheckpointDir = true) + tracker1.isLogManagerEnabled should be (false) + + // When WAL is explicitly disabled, log manager should not be enabled + conf.set("spark.streaming.receiver.writeAheadLog.enable", "false") + val tracker2 = createTracker(setCheckpointDir = true) + tracker2.isLogManagerEnabled should be(false) + } /** * Create tracker object with the optional provided clock. Use fake clock if you * want to control time by manually incrementing it to test log cleanup. */ - def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = { - val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None + def createTracker( + setCheckpointDir: Boolean = true, + clock: Clock = new SystemClock): ReceivedBlockTracker = { + val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption) allReceivedBlockTrackers += tracker tracker