Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into standalone-cluster
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
  • Loading branch information
andrewor14 committed Jul 25, 2014
2 parents b890949 + a45d548 commit 8e105e1
Show file tree
Hide file tree
Showing 90 changed files with 1,795 additions and 755 deletions.
3 changes: 2 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
<deb.bin.filemode>744</deb.bin.filemode>
</properties>

<dependencies>
Expand Down Expand Up @@ -276,7 +277,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/bin</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand Down
5 changes: 5 additions & 0 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object Bagel extends Logging {
var verts = vertices
var msgs = messages
var noActivity = false
var lastRDD: RDD[(K, (V, Array[M]))] = null
do {
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
Expand All @@ -83,6 +84,10 @@ object Bagel extends Logging {
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
if (lastRDD != null) {
lastRDD.unpersist(false)
}
lastRDD = processed

val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}

// Spark properties included on command line take precedence
sysProps ++= args.sparkProperties

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
loadDefaults()
Expand Down Expand Up @@ -177,6 +178,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| propertiesFile $propertiesFile
| extraSparkProperties $sparkProperties
| driverMemory $driverMemory
| driverCores $driverCores
| driverExtraClassPath $driverExtraClassPath
Expand Down Expand Up @@ -290,6 +292,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
jars = Utils.resolveURIs(value)
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

Expand Down Expand Up @@ -349,6 +358,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
| --conf PROP=VALUE Arbitrary Spark configuration property.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = "/history/" + info.id
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HistoryServer(
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

val contextHandler = new ServletContextHandler
contextHandler.setContextPath("/history")
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
Expand Down Expand Up @@ -172,6 +172,8 @@ class HistoryServer(
object HistoryServer extends Logging {
private val conf = new SparkConf

val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
Expand Down Expand Up @@ -664,9 +665,10 @@ private[spark] class Master(
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = "/history/not-found"
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
Expand All @@ -681,13 +683,14 @@ private[spark] class Master(
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
Expand All @@ -702,7 +705,7 @@ private[spark] class Master(
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,20 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
*
* @param prev RDD to be sampled
* @param sampler a random sampler
* @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD
* @param seed random seed
* @tparam T input RDD item type
* @tparam U sampled RDD item type
*/
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
@transient preservesPartitioning: Boolean,
@transient seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {

@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None

override def getPartitions: Array[Partition] = {
val random = new Random(seed)
firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,9 @@ abstract class RDD[T: ClassTag](
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), seed)
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}

Expand All @@ -374,7 +374,7 @@ abstract class RDD[T: ClassTag](
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), seed)
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed)
}.toArray
}

Expand Down Expand Up @@ -586,6 +586,9 @@ abstract class RDD[T: ClassTag](

/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
Expand All @@ -596,6 +599,9 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
Expand All @@ -607,6 +613,9 @@ abstract class RDD[T: ClassTag](
* :: DeveloperApi ::
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
@DeveloperApi
def mapPartitionsWithContext[U: ClassTag](
Expand Down Expand Up @@ -689,7 +698,7 @@ abstract class RDD[T: ClassTag](
* a map on the other).
*/
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
zipPartitions(other, true) { (thisIter, otherIter) =>
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ private[spark] class EventLoggingListener(
// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
}

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class LocalActor(
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += 1
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}

Expand All @@ -68,7 +68,7 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= 1
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ class KryoSerializer(conf: SparkConf)

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
kryo.setRegistrationRequired(registrationRequired)
val classLoader = Thread.currentThread.getContextClassLoader

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
Expand Down Expand Up @@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]]
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private[spark] object UIUtils extends Logging {

def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource

val commonHeaderNodes = {
def commonHeaderNodes = {
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
type="text/css" />
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ui.jobs

import scala.xml.Node
import scala.xml.Text

import java.util.Date

Expand Down Expand Up @@ -99,19 +100,30 @@ private[ui] class StageTableBase(
{s.name}
</a>

val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
val details = if (s.details.nonEmpty) {
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+show details
</span>
<pre class="stage-details collapsed">{s.details}</pre>
+details
</span> ++
<div class="stage-details collapsed">
{if (cachedRddInfos.nonEmpty) {
Text("RDD: ") ++
// scalastyle:off
cachedRddInfos.map { i =>
<a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(basePath), i.id)}>{i.name}</a>
}
// scalastyle:on
}}
<pre>{s.details}</pre>
</div>
}

val stageDataOption = listener.stageIdToData.get(s.stageId)
// Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
val desc = stageDataOption.get.description
<div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
<div><em>{desc}</em></div><div>{killLink} {nameLink} {details}</div>
} else {
<div>{killLink} {nameLink} {details}</div>
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ private[spark] object JsonProtocol {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
case fetchFailed: FetchFailed =>
val blockManagerAddress = blockManagerIdToJson(fetchFailed.bmAddress)
val blockManagerAddress = Option(fetchFailed.bmAddress).
map(blockManagerIdToJson).getOrElse(JNothing)
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ExternalAppendOnlyMap[K, V, C](
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
private val threadId = Thread.currentThread().getId

/**
* Insert the given key and value into the map.
Expand All @@ -128,7 +129,6 @@ class ExternalAppendOnlyMap[K, V, C](
// Atomically check whether there is sufficient memory in the global pool for
// this map to grow and, if possible, allocate the required amount
shuffleMemoryMap.synchronized {
val threadId = Thread.currentThread().getId
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
Expand All @@ -153,8 +153,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def spill(mapSize: Long) {
spillCount += 1
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
var objectsWritten = 0
Expand Down
Loading

0 comments on commit 8e105e1

Please sign in to comment.