").append(m.parseHTML(a)).find(d):a)}).complete(c&&function(a,b){g.each(c,e||[a.responseText,b,a])}),this},m.expr.filters.animated=function(a){return m.grep(m.timers,function(b){return a===b.elem}).length};var cd=a.document.documentElement;function dd(a){return m.isWindow(a)?a:9===a.nodeType?a.defaultView||a.parentWindow:!1}m.offset={setOffset:function(a,b,c){var d,e,f,g,h,i,j,k=m.css(a,"position"),l=m(a),n={};"static"===k&&(a.style.position="relative"),h=l.offset(),f=m.css(a,"top"),i=m.css(a,"left"),j=("absolute"===k||"fixed"===k)&&m.inArray("auto",[f,i])>-1,j?(d=l.position(),g=d.top,e=d.left):(g=parseFloat(f)||0,e=parseFloat(i)||0),m.isFunction(b)&&(b=b.call(a,c,h)),null!=b.top&&(n.top=b.top-h.top+g),null!=b.left&&(n.left=b.left-h.left+e),"using"in b?b.using.call(a,n):l.css(n)}},m.fn.extend({offset:function(a){if(arguments.length)return void 0===a?this:this.each(function(b){m.offset.setOffset(this,a,b)});var b,c,d={top:0,left:0},e=this[0],f=e&&e.ownerDocument;if(f)return b=f.documentElement,m.contains(b,e)?(typeof e.getBoundingClientRect!==K&&(d=e.getBoundingClientRect()),c=dd(f),{top:d.top+(c.pageYOffset||b.scrollTop)-(b.clientTop||0),left:d.left+(c.pageXOffset||b.scrollLeft)-(b.clientLeft||0)}):d},position:function(){if(this[0]){var a,b,c={top:0,left:0},d=this[0];return"fixed"===m.css(d,"position")?b=d.getBoundingClientRect():(a=this.offsetParent(),b=this.offset(),m.nodeName(a[0],"html")||(c=a.offset()),c.top+=m.css(a[0],"borderTopWidth",!0),c.left+=m.css(a[0],"borderLeftWidth",!0)),{top:b.top-c.top-m.css(d,"marginTop",!0),left:b.left-c.left-m.css(d,"marginLeft",!0)}}},offsetParent:function(){return this.map(function(){var a=this.offsetParent||cd;while(a&&!m.nodeName(a,"html")&&"static"===m.css(a,"position"))a=a.offsetParent;return a||cd})}}),m.each({scrollLeft:"pageXOffset",scrollTop:"pageYOffset"},function(a,b){var c=/Y/.test(b);m.fn[a]=function(d){return V(this,function(a,d,e){var f=dd(a);return void 0===e?f?b in f?f[b]:f.document.documentElement[d]:a[d]:void(f?f.scrollTo(c?m(f).scrollLeft():e,c?e:m(f).scrollTop()):a[d]=e)},a,d,arguments.length,null)}}),m.each(["top","left"],function(a,b){m.cssHooks[b]=Lb(k.pixelPosition,function(a,c){return c?(c=Jb(a,b),Hb.test(c)?m(a).position()[b]+"px":c):void 0})}),m.each({Height:"height",Width:"width"},function(a,b){m.each({padding:"inner"+a,content:b,"":"outer"+a},function(c,d){m.fn[d]=function(d,e){var f=arguments.length&&(c||"boolean"!=typeof d),g=c||(d===!0||e===!0?"margin":"border");return V(this,function(b,c,d){var e;return m.isWindow(b)?b.document.documentElement["client"+a]:9===b.nodeType?(e=b.documentElement,Math.max(b.body["scroll"+a],e["scroll"+a],b.body["offset"+a],e["offset"+a],e["client"+a])):void 0===d?m.css(b,c,g):m.style(b,c,d,g)},b,f?d:void 0,f,null)}})}),m.fn.size=function(){return this.length},m.fn.andSelf=m.fn.addBack,"function"==typeof define&&define.amd&&define("jquery",[],function(){return m});var ed=a.jQuery,fd=a.$;return m.noConflict=function(b){return a.$===m&&(a.$=fd),b&&a.jQuery===m&&(a.jQuery=ed),m},typeof b===K&&(a.jQuery=a.$=m),m});
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index a8bc141208a94..445110d63e184 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -81,7 +81,9 @@ table.sortable thead {
span.kill-link {
margin-right: 2px;
+ margin-left: 20px;
color: gray;
+ float: right;
}
span.kill-link a {
@@ -95,6 +97,10 @@ span.expand-details {
float: right;
}
+pre {
+ font-size: 0.8em;
+}
+
.stage-details {
max-height: 100px;
overflow-y: auto;
@@ -108,3 +114,8 @@ span.expand-details {
padding-bottom: 0;
border: none;
}
+
+.tooltip {
+ font-weight: normal;
+}
+
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index cdfd338081fa2..9c55bfbb47626 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -127,7 +127,7 @@ class Accumulable[R, T] (
Accumulators.register(this, false)
}
- override def toString = value_.toString
+ override def toString = if (value_ == null) "null" else value_.toString
}
/**
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 315ed91f81df3..8f867686a0443 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,106 +19,57 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
+import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}
+import org.apache.spark.storage._
/**
- * Spark class responsible for passing RDDs split contents to the BlockManager and making
+ * Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- /** Keys of RDD splits that are being computed/loaded. */
+ /** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
- /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
+ /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
- split: Partition,
+ partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
- val key = RDDBlockId(rdd.id, split.index)
+ val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
- case Some(values) =>
+ case Some(blockResult) =>
// Partition is already materialized, so just return its values
- new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
- // Mark the split as loading (unless someone else marks it first)
- loading.synchronized {
- if (loading.contains(key)) {
- logInfo(s"Another thread is loading $key, waiting for it to finish...")
- while (loading.contains(key)) {
- try {
- loading.wait()
- } catch {
- case e: Exception =>
- logWarning(s"Got an exception while waiting for another thread to load $key", e)
- }
- }
- logInfo(s"Finished waiting for $key")
- /* See whether someone else has successfully loaded it. The main way this would fail
- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
- * partition but we didn't want to make space for it. However, that case is unlikely
- * because it's unlikely that two threads would work on the same RDD partition. One
- * downside of the current code is that threads wait serially if this does happen. */
- blockManager.get(key) match {
- case Some(values) =>
- return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
- case None =>
- logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
- loading.add(key)
- }
- } else {
- loading.add(key)
- }
+ // Acquire a lock for loading this partition
+ // If another thread already holds the lock, wait for it to finish return its results
+ val storedValues = acquireLockForPartition[T](key)
+ if (storedValues.isDefined) {
+ return new InterruptibleIterator[T](context, storedValues.get)
}
+
+ // Otherwise, we have to load the partition ourselves
try {
- // If we got here, we have to load the split
logInfo(s"Partition $key not found, computing it")
- val computedValues = rdd.computeOrReadCheckpoint(split, context)
+ val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- // Persist the result, so long as the task is not running locally
+ // If the task is running locally, do not persist the result
if (context.runningLocally) {
return computedValues
}
- // Keep track of blocks with updated statuses
- var updatedBlocks = Seq[(BlockId, BlockStatus)]()
- val returnValue: Iterator[T] = {
- if (storageLevel.useDisk && !storageLevel.useMemory) {
- /* In the case that this RDD is to be persisted using DISK_ONLY
- * the iterator will be passed directly to the blockManager (rather then
- * caching it to an ArrayBuffer first), then the resulting block data iterator
- * will be passed back to the user. If the iterator generates a lot of data,
- * this means that it doesn't all have to be held in memory at one time.
- * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
- * blocks aren't dropped by the block store before enabling that. */
- updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
- blockManager.get(key) match {
- case Some(values) =>
- values.asInstanceOf[Iterator[T]]
- case None =>
- logInfo(s"Failure to store $key")
- throw new SparkException("Block manager failed to return persisted value")
- }
- } else {
- // In this case the RDD is cached to an array buffer. This will save the results
- // if we're dealing with a 'one-time' iterator
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
- }
- }
-
- // Update task metrics to include any blocks whose storage status is updated
- val metrics = context.taskMetrics
- metrics.updatedBlocks = Some(updatedBlocks)
-
- new InterruptibleIterator(context, returnValue)
+ // Otherwise, cache the values and keep track of any updates in block statuses
+ val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
+ context.taskMetrics.updatedBlocks = Some(updatedBlocks)
+ new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
@@ -128,4 +79,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
}
}
+
+ /**
+ * Acquire a loading lock for the partition identified by the given block ID.
+ *
+ * If the lock is free, just acquire it and return None. Otherwise, another thread is already
+ * loading the partition, so we wait for it to finish and return the values loaded by the thread.
+ */
+ private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
+ loading.synchronized {
+ if (!loading.contains(id)) {
+ // If the partition is free, acquire its lock to compute its value
+ loading.add(id)
+ None
+ } else {
+ // Otherwise, wait for another thread to finish and return its result
+ logInfo(s"Another thread is loading $id, waiting for it to finish...")
+ while (loading.contains(id)) {
+ try {
+ loading.wait()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while waiting for another thread to load $id", e)
+ }
+ }
+ logInfo(s"Finished waiting for $id")
+ val values = blockManager.get(id)
+ if (!values.isDefined) {
+ /* The block is not guaranteed to exist even after the other thread has finished.
+ * For instance, the block could be evicted after it was put, but before our get.
+ * In this case, we still need to load the partition ourselves. */
+ logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
+ loading.add(id)
+ }
+ values.map(_.data.asInstanceOf[Iterator[T]])
+ }
+ }
+ }
+
+ /**
+ * Cache the values of a partition, keeping track of any updates in the storage statuses
+ * of other blocks along the way.
+ */
+ private def putInBlockManager[T](
+ key: BlockId,
+ values: Iterator[T],
+ storageLevel: StorageLevel,
+ updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
+
+ if (!storageLevel.useMemory) {
+ /* This RDD is not to be cached in memory, so we can just pass the computed values
+ * as an iterator directly to the BlockManager, rather than first fully unrolling
+ * it in memory. The latter option potentially uses much more memory and risks OOM
+ * exceptions that can be avoided. */
+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+ blockManager.get(key) match {
+ case Some(v) => v.data.asInstanceOf[Iterator[T]]
+ case None =>
+ logInfo(s"Failure to store $key")
+ throw new BlockException(key, s"Block manager failed to return cached value for $key!")
+ }
+ } else {
+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+ * to the BlockManager as an iterator and expect to read it back later. This is because
+ * we may end up dropping a partition from memory store before getting it back, e.g.
+ * when the entirety of the RDD does not fit in memory. */
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
+ elements.iterator.asInstanceOf[Iterator[T]]
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index c8c194a111aac..09a60571238ea 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -61,7 +61,8 @@ class ShuffleDependency[K, V, C](
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
- val aggregator: Option[Aggregator[K, V, C]] = None)
+ val aggregator: Option[Aggregator[K, V, C]] = None,
+ val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ee82d9fa7874b..894091761485d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -25,7 +25,9 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask
+
import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
@@ -105,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
Await.result(future, timeout)
} catch {
case e: Exception =>
+ logError("Error communicating with MapOutputTracker", e)
throw new SparkException("Error communicating with MapOutputTracker", e)
}
}
/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) {
- if (askTracker(message) != true) {
- throw new SparkException("Error reply received from MapOutputTracker")
+ val response = askTracker(message)
+ if (response != true) {
+ throw new SparkException(
+ "Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
}
}
@@ -168,8 +173,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing all output locations for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
@@ -364,15 +369,15 @@ private[spark] object MapOutputTracker {
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
private def convertMapStatuses(
- shuffleId: Int,
- reduceId: Int,
- statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+ shuffleId: Int,
+ reduceId: Int,
+ statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
if (status == null) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
@@ -401,7 +406,7 @@ private[spark] object MapOutputTracker {
if (compressedSize == 0) {
0
} else {
- math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e7f75481939a8..ec99648a8488a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -17,11 +17,13 @@
package org.apache.spark
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.CollectionsUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{CollectionsUtils, Utils}
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
@@ -96,15 +98,15 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
- partitions: Int,
+ @transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
- private val ascending: Boolean = true)
+ private var ascending: Boolean = true)
extends Partitioner {
- private val ordering = implicitly[Ordering[K]]
+ private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
- private val rangeBounds: Array[K] = {
+ private var rangeBounds: Array[K] = {
if (partitions == 1) {
Array()
} else {
@@ -127,7 +129,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
def numPartitions = rangeBounds.length + 1
- private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
+ private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
@@ -173,4 +175,40 @@ class RangePartitioner[K : Ordering : ClassTag, V](
result = prime * result + ascending.hashCode
result
}
+
+ @throws(classOf[IOException])
+ private def writeObject(out: ObjectOutputStream) {
+ val sfactory = SparkEnv.get.serializer
+ sfactory match {
+ case js: JavaSerializer => out.defaultWriteObject()
+ case _ =>
+ out.writeBoolean(ascending)
+ out.writeObject(ordering)
+ out.writeObject(binarySearch)
+
+ val ser = sfactory.newInstance()
+ Utils.serializeViaNestedStream(out, ser) { stream =>
+ stream.writeObject(scala.reflect.classTag[Array[K]])
+ stream.writeObject(rangeBounds)
+ }
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream) {
+ val sfactory = SparkEnv.get.serializer
+ sfactory match {
+ case js: JavaSerializer => in.defaultReadObject()
+ case _ =>
+ ascending = in.readBoolean()
+ ordering = in.readObject().asInstanceOf[Ordering[K]]
+ binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]
+
+ val ser = sfactory.newInstance()
+ Utils.deserializeViaNestedStream(in, ser) { ds =>
+ implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
+ rangeBounds = ds.readObject[Array[K]]()
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0678bdd02110e..8819e73d17fb2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -224,7 +224,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
- val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
@@ -1204,9 +1203,17 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
+ * If
checkSerializable is set,
clean will also proactively
+ * check to see if
f is serializable and throw a
SparkException
+ * if not.
+ *
+ * @param f the closure to clean
+ * @param checkSerializable whether or not to immediately check
f for serializability
+ * @throws
SparkException if checkSerializable is set but f is not
+ * serializable
*/
- private[spark] def clean[F <: AnyRef](f: F): F = {
- ClosureCleaner.clean(f)
+ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+ ClosureCleaner.clean(f, checkSerializable)
f
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8dfa8cc4b5b3f..8f70744d804d9 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -79,7 +79,7 @@ class SparkEnv (
private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
- httpFileServer.stop()
+ Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
@@ -183,6 +183,7 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ logDebug(s"Using serializer: ${serializer.getClass}")
val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
@@ -227,9 +228,15 @@ object SparkEnv extends Logging {
val cacheManager = new CacheManager(blockManager)
- val httpFileServer = new HttpFileServer(securityManager)
- httpFileServer.initialize()
- conf.set("spark.fileserver.uri", httpFileServer.serverUri)
+ val httpFileServer =
+ if (isDriver) {
+ val server = new HttpFileServer(securityManager)
+ server.initialize()
+ conf.set("spark.fileserver.uri", server.serverUri)
+ server
+ } else {
+ null
+ }
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4351ed74b67fc..2ebd7a7151a59 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
+
+/**
+ * Exception thrown when execution of some user code in the driver process fails, e.g.
+ * accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
+ * that can be misbehaving.
+ */
+private[spark] class SparkDriverExecutionException(cause: Throwable)
+ extends SparkException("Execution error", cause)
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index a3074916d13e7..df42d679b4699 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -30,27 +30,69 @@ import org.apache.spark.storage.BlockManagerId
@DeveloperApi
sealed trait TaskEndReason
+/**
+ * :: DeveloperApi ::
+ * Task succeeded.
+ */
@DeveloperApi
case object Success extends TaskEndReason
+/**
+ * :: DeveloperApi ::
+ * Various possible reasons why a task failed.
+ */
+@DeveloperApi
+sealed trait TaskFailedReason extends TaskEndReason {
+ /** Error message displayed in the web UI. */
+ def toErrorString: String
+}
+
+/**
+ * :: DeveloperApi ::
+ * A [[org.apache.spark.scheduler.ShuffleMapTask]] that completed successfully earlier, but we
+ * lost the executor before the stage completed. This means Spark needs to reschedule the task
+ * to be re-executed on a different executor.
+ */
@DeveloperApi
-case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
+case object Resubmitted extends TaskFailedReason {
+ override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)"
+}
+/**
+ * :: DeveloperApi ::
+ * Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote
+ * executors the task is trying to fetch from, and thus need to rerun the previous stage.
+ */
@DeveloperApi
case class FetchFailed(
- bmAddress: BlockManagerId,
+ bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
- extends TaskEndReason
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
+ s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
+ }
+}
+/**
+ * :: DeveloperApi ::
+ * Task failed due to a runtime exception. This is the most common failure case and also captures
+ * user program exceptions.
+ */
@DeveloperApi
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
- extends TaskEndReason
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
+ s"$className ($description}\n$stackTraceString"
+ }
+}
/**
* :: DeveloperApi ::
@@ -58,10 +100,18 @@ case class ExceptionFailure(
* it was fetched.
*/
@DeveloperApi
-case object TaskResultLost extends TaskEndReason
+case object TaskResultLost extends TaskFailedReason {
+ override def toErrorString: String = "TaskResultLost (result lost from block manager)"
+}
+/**
+ * :: DeveloperApi ::
+ * Task was killed intentionally and needs to be rescheduled.
+ */
@DeveloperApi
-case object TaskKilled extends TaskEndReason
+case object TaskKilled extends TaskFailedReason {
+ override def toErrorString: String = "TaskKilled (killed intentionally)"
+}
/**
* :: DeveloperApi ::
@@ -69,7 +119,9 @@ case object TaskKilled extends TaskEndReason
* the task crashed the JVM.
*/
@DeveloperApi
-case object ExecutorLostFailure extends TaskEndReason
+case object ExecutorLostFailure extends TaskFailedReason {
+ override def toErrorString: String = "ExecutorLostFailure (executor lost)"
+}
/**
* :: DeveloperApi ::
@@ -77,4 +129,6 @@ case object ExecutorLostFailure extends TaskEndReason
* deserializing the task result.
*/
@DeveloperApi
-case object UnknownReason extends TaskEndReason
+case object UnknownReason extends TaskFailedReason {
+ override def toErrorString: String = "UnknownReason"
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 14fa9d8135afe..4f3081433a542 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
+ /**
+ * For each key k in `this` or `other1` or `other2` or `other3`,
+ * return a resulting RDD that contains a tuple with the list of values
+ * for that key in `this`, `other1`, `other2` and `other3`.
+ */
+ def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
+ other2: JavaPairRDD[K, W2],
+ other3: JavaPairRDD[K, W3],
+ partitioner: Partitioner)
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))
+
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
@@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
+ /**
+ * For each key k in `this` or `other1` or `other2` or `other3`,
+ * return a resulting RDD that contains a tuple with the list of values
+ * for that key in `this`, `other1`, `other2` and `other3`.
+ */
+ def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
+ other2: JavaPairRDD[K, W2],
+ other3: JavaPairRDD[K, W3])
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))
+
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
@@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
+ /**
+ * For each key k in `this` or `other1` or `other2` or `other3`,
+ * return a resulting RDD that contains a tuple with the list of values
+ * for that key in `this`, `other1`, `other2` and `other3`.
+ */
+ def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1],
+ other2: JavaPairRDD[K, W2],
+ other3: JavaPairRDD[K, W3],
+ numPartitions: Int)
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))
+
/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
@@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
+ /** Alias for cogroup. */
+ def groupWith[W1, W2, W3](other1: JavaPairRDD[K, W1],
+ other2: JavaPairRDD[K, W2],
+ other3: JavaPairRDD[K, W3])
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])] =
+ fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))
+
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
@@ -786,6 +828,15 @@ object JavaPairRDD {
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}
+ private[spark]
+ def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
+ rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
+ : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
+ rddToPairRDDFunctions(rdd)
+ .mapValues(x =>
+ (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
+ }
+
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
new JavaPairRDD[K, V](rdd)
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 330569a8d8837..f917cfd1419ec 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -43,8 +43,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def rdd: RDD[T]
- /** Set of partitions in this RDD. */
+ @deprecated("Use partitions() instead.", "1.1.0")
def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
+
+ /** Set of partitions in this RDD. */
+ def partitions: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context: SparkContext = rdd.context
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f6570d335757a..462e09466bfa6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
} else {
// This happens on the master, where we pass the updates to Python through a socket
val socket = new Socket(serverHost, serverPort)
+ // SPARK-2282: Immediately reuse closed sockets because we create one per task.
+ socket.setReuseAddress(true)
val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 5da9615c9e9af..39150deab863c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
+import org.apache.spark.util.MemoryParam
+
/**
* Command-line parser for the driver client.
*/
@@ -51,8 +53,8 @@ private[spark] class ClientArguments(args: Array[String]) {
cores = value.toInt
parse(tail)
- case ("--memory" | "-m") :: value :: tail =>
- memory = value.toInt
+ case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
+ memory = value
parse(tail)
case ("--supervise" | "-s") :: tail =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7e9a9344e61f9..b050dccb6d57f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
import java.io.{File, PrintStream}
import java.lang.reflect.InvocationTargetException
-import java.net.{URI, URL}
+import java.net.URL
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -117,14 +117,25 @@ object SparkSubmit {
val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster
+ // For mesos, only client mode is supported
if (clusterManager == MESOS && deployOnCluster) {
- printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
+ printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+ }
+
+ // For standalone, only client mode is supported
+ if (clusterManager == STANDALONE && deployOnCluster) {
+ printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.")
+ }
+
+ // For shells, only client mode is applicable
+ if (isShell(args.primaryResource) && deployOnCluster) {
+ printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
}
// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
- printErrorAndExit("Cannot currently run Python driver programs on cluster")
+ printErrorAndExit("Cluster deploy mode is currently not supported for python.")
}
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index f1032ea8dbada..57655aa4c32b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -338,8 +338,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
"""Usage: spark-submit [options] [app options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
- | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run
- | on the local machine, or "cluster" to run inside cluster.
+ | --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
+ | on one of the worker machines inside the cluster ("cluster")
+ | (Default: client).
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
new file mode 100644
index 0000000000000..a0e8bd403a41d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import org.apache.spark.ui.SparkUI
+
+private[spark] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String)
+
+private[spark] abstract class ApplicationHistoryProvider {
+
+ /**
+ * Returns a list of applications available for the history server to show.
+ *
+ * @return List of all know applications.
+ */
+ def getListing(): Seq[ApplicationHistoryInfo]
+
+ /**
+ * Returns the Spark UI for a specific application.
+ *
+ * @param appId The application ID.
+ * @return The application's UI, or null if application is not found.
+ */
+ def getAppUI(appId: String): SparkUI
+
+ /**
+ * Called when the server is shutting down.
+ */
+ def stop(): Unit = { }
+
+ /**
+ * Returns configuration data to be shown in the History Server home page.
+ *
+ * @return A map with the configuration data. Data is show in the order returned by the map.
+ */
+ def getConfig(): Map[String, String] = Map()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
new file mode 100644
index 0000000000000..a8c9ac072449f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+
+private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
+ with Logging {
+
+ // Interval between each check for event log updates
+ private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
+ conf.getInt("spark.history.updateInterval", 10)) * 1000
+
+ private val logDir = conf.get("spark.history.fs.logDirectory", null)
+ if (logDir == null) {
+ throw new IllegalArgumentException("Logging directory must be specified.")
+ }
+
+ private val fs = Utils.getHadoopFileSystem(logDir)
+
+ // A timestamp of when the disk was last accessed to check for log updates
+ private var lastLogCheckTimeMs = -1L
+
+ // List of applications, in order from newest to oldest.
+ @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+
+ /**
+ * A background thread that periodically checks for event log updates on disk.
+ *
+ * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
+ * time at which it performs the next log check to maintain the same period as before.
+ *
+ * TODO: Add a mechanism to update manually.
+ */
+ private val logCheckingThread = new Thread("LogCheckingThread") {
+ override def run() = Utils.logUncaughtExceptions {
+ while (true) {
+ val now = getMonotonicTimeMs()
+ if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
+ Thread.sleep(UPDATE_INTERVAL_MS)
+ } else {
+ // If the user has manually checked for logs recently, wait until
+ // UPDATE_INTERVAL_MS after the last check time
+ Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
+ }
+ checkForLogs()
+ }
+ }
+ }
+
+ initialize()
+
+ private def initialize() {
+ // Validate the log directory.
+ val path = new Path(logDir)
+ if (!fs.exists(path)) {
+ throw new IllegalArgumentException(
+ "Logging directory specified does not exist: %s".format(logDir))
+ }
+ if (!fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
+ }
+
+ checkForLogs()
+ logCheckingThread.setDaemon(true)
+ logCheckingThread.start()
+ }
+
+ override def getListing() = appList
+
+ override def getAppUI(appId: String): SparkUI = {
+ try {
+ val appLogDir = fs.getFileStatus(new Path(logDir, appId))
+ loadAppInfo(appLogDir, true)._2
+ } catch {
+ case e: FileNotFoundException => null
+ }
+ }
+
+ override def getConfig(): Map[String, String] =
+ Map(("Event Log Location" -> logDir))
+
+ /**
+ * Builds the application list based on the current contents of the log directory.
+ * Tries to reuse as much of the data already in memory as possible, by not reading
+ * applications that haven't been updated since last time the logs were checked.
+ */
+ private def checkForLogs() = {
+ lastLogCheckTimeMs = getMonotonicTimeMs()
+ logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
+ try {
+ val logStatus = fs.listStatus(new Path(logDir))
+ val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
+ val logInfos = logDirs.filter {
+ dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
+ }
+
+ val currentApps = Map[String, ApplicationHistoryInfo](
+ appList.map(app => (app.id -> app)):_*)
+
+ // For any application that either (i) is not listed or (ii) has changed since the last time
+ // the listing was created (defined by the log dir's modification time), load the app's info.
+ // Otherwise just reuse what's already in memory.
+ val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
+ for (dir <- logInfos) {
+ val curr = currentApps.getOrElse(dir.getPath().getName(), null)
+ if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+ try {
+ newApps += loadAppInfo(dir, false)._1
+ } catch {
+ case e: Exception => logError(s"Failed to load app info from directory $dir.")
+ }
+ } else {
+ newApps += curr
+ }
+ }
+
+ appList = newApps.sortBy { info => -info.endTime }
+ } catch {
+ case t: Throwable => logError("Exception in checking for event log updates", t)
+ }
+ }
+
+ /**
+ * Parse the application's logs to find out the information we need to build the
+ * listing page.
+ *
+ * When creating the listing of available apps, there is no need to load the whole UI for the
+ * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
+ * clicks on a specific application.
+ *
+ * @param logDir Directory with application's log files.
+ * @param renderUI Whether to create the SparkUI for the application.
+ * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
+ */
+ private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
+ val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
+ val path = logDir.getPath
+ val appId = path.getName
+ val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
+ val appListener = new ApplicationEventListener
+ replayBus.addListener(appListener)
+
+ val ui: SparkUI = if (renderUI) {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
+ // Do not call ui.bind() to avoid creating a new server for each application
+ } else {
+ null
+ }
+
+ replayBus.replay()
+ val appInfo = ApplicationHistoryInfo(
+ appId,
+ appListener.appName,
+ appListener.startTime,
+ appListener.endTime,
+ getModificationTime(logDir),
+ appListener.sparkUser)
+
+ if (ui != null) {
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setUIAcls(uiAclsEnabled)
+ ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
+ }
+ (appInfo, ui)
+ }
+
+ /** Return when this directory was last modified. */
+ private def getModificationTime(dir: FileStatus): Long = {
+ try {
+ val logFiles = fs.listStatus(dir.getPath)
+ if (logFiles != null && !logFiles.isEmpty) {
+ logFiles.map(_.getModificationTime).max
+ } else {
+ dir.getModificationTime
+ }
+ } catch {
+ case t: Throwable =>
+ logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+ -1L
+ }
+ }
+
+ /** Returns the system's mononotically increasing time. */
+ private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 180c853ce3096..a958c837c2ff6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -25,20 +25,36 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
+ private val pageSize = 20
+
def render(request: HttpServletRequest): Seq[Node] = {
- val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
- val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+ val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
+ val requestedFirst = (requestedPage - 1) * pageSize
+
+ val allApps = parent.getApplicationList()
+ val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
+ val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+
+ val actualPage = (actualFirst / pageSize) + 1
+ val last = Math.min(actualFirst + pageSize, allApps.size) - 1
+ val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+
+ val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val providerConfig = parent.getProviderConfig()
val content =
- - Event Log Location: {parent.baseLogDir}
+ { providerConfig.map(e => - {e._1}: {e._2}
) }
{
- if (parent.appIdToInfo.size > 0) {
+ if (allApps.size > 0) {
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+
+ {if (actualPage > 1) <}
+ {if (actualPage < pageCount) >}
+
++
appTable
} else {
@@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
- val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
- val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
+ val uiAddress = "/history/" + info.id
+ val startTime = UIUtils.formatDate(info.startTime)
+ val endTime = UIUtils.formatDate(info.endTime)
+ val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
- {appName} |
+ {info.name} |
{startTime} |
{endTime} |
{duration} |
- {sparkUser} |
- {logDirectory} |
+ {info.sparkUser} |
{lastUpdated} |
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca5678e..56b38ddfc9313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,16 +17,17 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SignalLogger, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -38,56 +39,68 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
-
- import HistoryServer._
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val ui = provider.getAppUI(key)
+ if (ui == null) {
+ throw new NoSuchElementException()
+ }
+ attachSparkUI(ui)
+ ui
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = Option(req.getPathInfo()).getOrElse("").split("/")
+ if (parts.length < 2) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ s"Unexpected path info in request (URI = ${req.getRequestURI()}")
+ return
+ }
- @volatile private var stopped = false
+ val appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +111,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +143,20 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
+ /**
+ * Returns a list of available applications, in descending order according to their end time.
+ *
+ * @return List of all known applications.
+ */
+ def getApplicationList() = provider.getListing()
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
+ /**
+ * Returns the provider configuration to show in the listing page.
+ *
+ * @return A map with the provider's configuration.
+ */
+ def getProviderConfig() = provider.getConfig()
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
- }
}
/**
@@ -248,33 +169,35 @@ class HistoryServer(
*
* This launches the HistoryServer as a Spark daemon.
*/
-object HistoryServer {
+object HistoryServer extends Logging {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
+ SignalLogger.register(log)
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
+ Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
+ override def run() = {
+ server.stop()
+ }
+ })
+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
- server.stop()
}
def initSecurity() {
@@ -291,17 +214,3 @@ object HistoryServer {
}
}
-
-
-private[spark] case class ApplicationHistoryInfo(
- id: String,
- name: String,
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- logDirPath: Path,
- ui: SparkUI) {
- def started = startTime != -1
- def completed = endTime != -1
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 943c061743dbd..be9361b754fc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.history
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
+import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
+ private var logDir: String = null
parse(args.toList)
@@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
case _ =>
printUsageAndExit(1)
}
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
+ if (logDir != null) {
+ conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
+ """
+ |Usage: HistoryServer
+ |
+ |Configuration options can be set by setting the corresponding JVM system property.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | spark.history.ui.port Port where server will listen for connections
+ | (default 18080)
+ | spark.history.acls.enable Whether to enable view acls for all applications
+ | (default false)
+ | spark.history.provider Name of history provider class (defaults to
+ | file system-based provider)
+ | spark.history.retainedApplications Max number of application UIs to keep loaded in memory
+ | (default 50)
+ |FsHistoryProvider options:
+ |
+ | spark.history.fs.logDirectory Directory where app logs are stored (required)
+ | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
+ | default 10)
+ |""".stripMargin)
System.exit(exitCode)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 33ffcbd216954..a304102a49086 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -41,7 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
private[spark] class Master(
host: String,
@@ -481,7 +481,7 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
- for (driver <- waitingDrivers) {
+ for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
@@ -755,12 +755,13 @@ private[spark] class Master(
}
}
-private[spark] object Master {
+private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
+ SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 6433aac1c23e0..467317dd9b44c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -77,6 +77,7 @@ private[spark] class ExecutorRunner(
* @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
+ var exitCode: Option[Int] = None
if (process != null) {
logInfo("Killing process!")
process.destroy()
@@ -87,9 +88,9 @@ private[spark] class ExecutorRunner(
if (stderrAppender != null) {
stderrAppender.stop()
}
- val exitCode = process.waitFor()
- worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
+ exitCode = Some(process.waitFor())
}
+ worker ! ExecutorStateChanged(appId, execId, state, message, exitCode)
}
/** Stop this executor runner, including killing the process it launched */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a0ecaf709f8e2..ce425443051b0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -365,8 +365,9 @@ private[spark] class Worker(
}
}
-private[spark] object Worker {
+private[spark] object Worker extends Logging {
def main(argStrings: Array[String]) {
+ SignalLogger.register(log)
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 6a5ffb1b71bfb..b389cb546de6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -120,7 +120,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w