Skip to content

Commit

Permalink
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark
Browse files Browse the repository at this point in the history
…into avoid-small-spills
  • Loading branch information
Andrew Or committed Nov 19, 2014
2 parents 27d6966 + 0df02ca commit 49f380f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 35 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,9 @@ object SparkContext extends Logging {
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
Expand Down
6 changes: 6 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ object MimaExcludes {
// the maven-generated artifacts in the 1.2 build.
MimaBuild.excludeSparkPackage("unused"),
ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional")
) ++ Seq(
// SPARK-2321
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkStageInfoImpl.this"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkStageInfo.submissionTime")
)

case v if v.startsWith("1.2") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
* than the input key. It is up to the developer to decide whether to
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
Expand Down Expand Up @@ -442,11 +441,10 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
* than the input key. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker(

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]

private val logManagerRollingIntervalSecs = conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
private val logManagerOption = checkpointDirOption.map { checkpointDir =>
new WriteAheadLogManager(
ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
hadoopConf,
rollingIntervalSecs = logManagerRollingIntervalSecs,
callerName = "ReceivedBlockHandlerMaster",
clock = clock
)
}
private val logManagerOption = createLogManager()

private var lastAllocatedBatchTime: Time = null

Expand Down Expand Up @@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker(
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}

/** Optionally create the write ahead log manager only if the feature is enabled */
private def createLogManager(): Option[WriteAheadLogManager] = {
if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
val rollingIntervalSecs = conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
val logManager = new WriteAheadLogManager(logDir, hadoopConf,
rollingIntervalSecs = rollingIntervalSecs, clock = clock,
callerName = "ReceivedBlockHandlerMaster")
Some(logManager)
} else {
None
}
}

/** Check if the log manager is enabled. This is only used for testing purposes. */
private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
}

private[streaming] object ReceivedBlockTracker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@ import org.apache.spark.util.Utils
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {

val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")

val hadoopConf = new Configuration()
val akkaTimeout = 10 seconds
val streamId = 1

var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
var checkpointDirectory: File = null
var conf: SparkConf = null

before {
conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
checkpointDirectory = Files.createTempDir()
}

Expand All @@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite
}

test("block addition, and block to batch allocation") {
val receivedBlockTracker = createTracker(enableCheckpoint = false)
val receivedBlockTracker = createTracker(setCheckpointDir = false)
receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty

val blockInfos = generateBlockInfos()
Expand Down Expand Up @@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite

test("block addition, block to batch allocation and cleanup with write ahead log") {
val manualClock = new ManualClock
conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)

// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
val timeIncrementMillis = 2000L

def incrementTime() {
val timeIncrementMillis = 2000L
manualClock.addToTime(timeIncrementMillis)
}

Expand All @@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite
}

// Start tracker and add blocks
val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
val tracker1 = createTracker(clock = manualClock)
tracker1.isLogManagerEnabled should be (true)

val blockInfos1 = addBlockInfos(tracker1)
tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1

Expand All @@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite

// Restart tracker and verify recovered list of unallocated blocks
incrementTime()
val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker2 = createTracker(clock = manualClock)
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1

// Allocate blocks to batch and verify whether the unallocated blocks got allocated
Expand All @@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite

// Restart tracker and verify recovered state
incrementTime()
val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker3 = createTracker(clock = manualClock)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
Expand All @@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker4 = createTracker(clock = manualClock)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}

test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}

test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
tracker1.isLogManagerEnabled should be (false)

// When WAL is explicitly disabled, log manager should not be enabled
conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
val tracker2 = createTracker(setCheckpointDir = true)
tracker2.isLogManagerEnabled should be(false)
}

/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log cleanup.
*/
def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None
def createTracker(
setCheckpointDir: Boolean = true,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
Expand Down

0 comments on commit 49f380f

Please sign in to comment.