Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4194
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
  • Loading branch information
Marcelo Vanzin committed Apr 3, 2015
2 parents 6b73fcb + 14632b7 commit 2621609
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 140 deletions.
32 changes: 25 additions & 7 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,24 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
* created.
*/
private[spark] case object TaskSchedulerIsSet

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext)
extends Actor with ActorLogReceive with Logging {

private var scheduler: TaskScheduler = null

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

Expand All @@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
}

override def receiveWithLogging: PartialFunction[Any, Unit] = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
// register itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
sender ! HeartbeatResponse(reregisterBlockManager = true)
}
case ExpireDeadHosts =>
expireDeadHosts()
}
Expand Down
94 changes: 48 additions & 46 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID

import scala.collection.{Map, Set}
Expand Down Expand Up @@ -97,10 +97,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

val startTime = System.currentTimeMillis()

@volatile private var stopped: Boolean = false
private val stopped: AtomicBoolean = new AtomicBoolean(false)

private def assertNotStopped(): Unit = {
if (stopped) {
if (stopped.get()) {
throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* ------------------------------------------------------------------------------------- */

private var _conf: SparkConf = _
private var _eventLogDir: Option[String] = None
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _metadataCleaner: MetadataCleaner = _
Expand Down Expand Up @@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[String] = _eventLogDir
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

// Generate the random name for a temp folder in Tachyon
Expand Down Expand Up @@ -370,7 +370,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_eventLogDir =
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
Expand Down Expand Up @@ -456,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_schedulerBackend = sched
_taskScheduler = ts
_heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
_dagScheduler = new DAGScheduler(this)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
Expand Down Expand Up @@ -1466,46 +1468,46 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addedJars.clear()
}

/** Shut down the SparkContext. */
// Shut down the SparkContext.
def stop() {
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (!stopped) {
stopped = true
postApplicationEnd()
_ui.foreach(_.stop())
if (env != null) {
env.metricsSystem.report()
}
if (metadataCleaner != null) {
metadataCleaner.cancel()
}
_cleaner.foreach(_.stop())
_executorAllocationManager.foreach(_.stop())
if (_dagScheduler != null) {
_dagScheduler.stop()
_dagScheduler = null
}
if (_listenerBusStarted) {
listenerBus.stop()
_listenerBusStarted = false
}
_eventLogger.foreach(_.stop())
if (env != null) {
env.actorSystem.stop(_heartbeatReceiver)
}
_progressBar.foreach(_.stop())
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
_env.stop()
SparkEnv.set(null)
}
logInfo("Successfully stopped SparkContext")
SparkContext.clearActiveContext()
} else {
logInfo("SparkContext already stopped")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}

postApplicationEnd()
_ui.foreach(_.stop())
if (env != null) {
env.metricsSystem.report()
}
if (metadataCleaner != null) {
metadataCleaner.cancel()
}
_cleaner.foreach(_.stop())
_executorAllocationManager.foreach(_.stop())
if (_dagScheduler != null) {
_dagScheduler.stop()
_dagScheduler = null
}
if (_listenerBusStarted) {
listenerBus.stop()
_listenerBusStarted = false
}
_eventLogger.foreach(_.stop())
if (env != null) {
env.actorSystem.stop(_heartbeatReceiver)
}
_progressBar.foreach(_.stop())
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
_env.stop()
SparkEnv.set(null)
}
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}


Expand Down Expand Up @@ -1567,7 +1569,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.deploy

import java.net.URI

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None,
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {
Expand All @@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
logBaseDir: String,
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf))
Expand Down Expand Up @@ -259,13 +259,13 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(
logBaseDir: String,
logBaseDir: URI,
appId: String,
compressionCodecName: Option[String] = None): String = {
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
// e.g. app_123, app_123.lzf
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
logBaseDir.toString.stripSuffix("/") + "/" + logName
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
eventLogger.start()

val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
Expand Down Expand Up @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
Expand All @@ -107,16 +107,19 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef

test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1"))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1"))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", Some("lz4")))
}

/* ----------------- *
Expand All @@ -137,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
Expand Down Expand Up @@ -173,12 +176,15 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
// Set defaultFS to something that would cause an exception, to make sure we don't run
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI().toString()
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.io.{File, PrintWriter}
import java.net.URI

import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down Expand Up @@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", "testdir", conf) {
extends EventLoggingListener("test", new URI("testdir"), conf) {

override def start() { }

Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ object MimaExcludes {
) ++ Seq(
// SPARK-6510 Add a Graph#minus method acting as Set#difference
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
) ++ Seq(
// SPARK-6492 Fix deadlock in SparkContext.stop()
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
"apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
)

case v if v.startsWith("1.3") =>
Expand Down
Loading

0 comments on commit 2621609

Please sign in to comment.