Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
Conflicts:
	sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
  • Loading branch information
yhuai committed Jul 25, 2014
2 parents d48fc7b + eff9714 commit 1db9531
Show file tree
Hide file tree
Showing 58 changed files with 1,708 additions and 505 deletions.
3 changes: 2 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
<deb.bin.filemode>744</deb.bin.filemode>
</properties>

<dependencies>
Expand Down Expand Up @@ -276,7 +277,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/bin</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand Down
5 changes: 5 additions & 0 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object Bagel extends Logging {
var verts = vertices
var msgs = messages
var noActivity = false
var lastRDD: RDD[(K, (V, Array[M]))] = null
do {
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
Expand All @@ -83,6 +84,10 @@ object Bagel extends Logging {
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
if (lastRDD != null) {
lastRDD.unpersist(false)
}
lastRDD = processed

val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}

// Spark properties included on command line take precedence
sysProps ++= args.sparkProperties

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
loadDefaults()
Expand Down Expand Up @@ -177,6 +178,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| propertiesFile $propertiesFile
| extraSparkProperties $sparkProperties
| driverMemory $driverMemory
| driverCores $driverCores
| driverExtraClassPath $driverExtraClassPath
Expand Down Expand Up @@ -290,6 +292,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
jars = Utils.resolveURIs(value)
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

Expand Down Expand Up @@ -349,6 +358,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
| --conf PROP=VALUE Arbitrary Spark configuration property.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.conf.
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = "/history/" + info.id
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HistoryServer(
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

val contextHandler = new ServletContextHandler
contextHandler.setContextPath("/history")
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
Expand Down Expand Up @@ -172,6 +172,8 @@ class HistoryServer(
object HistoryServer extends Logging {
private val conf = new SparkConf

val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
Expand Down Expand Up @@ -664,9 +665,10 @@ private[spark] class Master(
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = "/history/not-found"
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
Expand All @@ -681,13 +683,14 @@ private[spark] class Master(
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
Expand All @@ -702,7 +705,7 @@ private[spark] class Master(
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title"
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ private[spark] class EventLoggingListener(
// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
}

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class LocalActor(
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += 1
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}

Expand All @@ -68,7 +68,7 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= 1
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ class KryoSerializer(conf: SparkConf)

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
kryo.setRegistrationRequired(registrationRequired)
val classLoader = Thread.currentThread.getContextClassLoader

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
Expand Down Expand Up @@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]]
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
}

Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ui.jobs

import scala.xml.Node
import scala.xml.Text

import java.util.Date

Expand Down Expand Up @@ -99,19 +100,30 @@ private[ui] class StageTableBase(
{s.name}
</a>

val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
val details = if (s.details.nonEmpty) {
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+show details
</span>
<pre class="stage-details collapsed">{s.details}</pre>
+details
</span> ++
<div class="stage-details collapsed">
{if (cachedRddInfos.nonEmpty) {
Text("RDD: ") ++
// scalastyle:off
cachedRddInfos.map { i =>
<a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(basePath), i.id)}>{i.name}</a>
}
// scalastyle:on
}}
<pre>{s.details}</pre>
</div>
}

val stageDataOption = listener.stageIdToData.get(s.stageId)
// Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
val desc = stageDataOption.get.description
<div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
<div><em>{desc}</em></div><div>{killLink} {nameLink} {details}</div>
} else {
<div>{killLink} {nameLink} {details}</div>
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ private[spark] object JsonProtocol {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
case fetchFailed: FetchFailed =>
val blockManagerAddress = blockManagerIdToJson(fetchFailed.bmAddress)
val blockManagerAddress = Option(fetchFailed.bmAddress).
map(blockManagerIdToJson).getOrElse(JNothing)
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "beauty",
"--conf", "spark.shuffle.spill=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
Expand All @@ -139,6 +140,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.shuffle.spill") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
}

Expand All @@ -156,6 +158,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "trill",
"--conf", "spark.shuffle.spill=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
Expand All @@ -176,6 +179,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps("spark.shuffle.spill") should be ("false")
}

test("handles standalone cluster mode") {
Expand All @@ -186,6 +190,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
"--supervise",
"--driver-memory", "4g",
"--driver-cores", "5",
"--conf", "spark.shuffle.spill=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
Expand All @@ -195,9 +200,10 @@ class SparkSubmitSuite extends FunSuite with Matchers {
childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
classpath should have size (0)
sysProps should have size (2)
sysProps should have size (3)
sysProps.keys should contain ("spark.jars")
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps("spark.shuffle.spill") should be ("false")
}

test("handles standalone client mode") {
Expand All @@ -208,6 +214,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.shuffle.spill=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
Expand All @@ -218,6 +225,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
sysProps("spark.shuffle.spill") should be ("false")
}

test("handles mesos client mode") {
Expand All @@ -228,6 +236,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.shuffle.spill=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
Expand All @@ -238,6 +247,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
sysProps("spark.shuffle.spill") should be ("false")
}

test("launch simple application with spark-submit") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(
Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
FakeRackUtil.cleanUp()
}

test("test RACK_LOCAL tasks") {
Expand Down Expand Up @@ -505,6 +506,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2
// Task 1 can be scheduled with RACK_LOCAL
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
FakeRackUtil.cleanUp()
}

test("do not emit warning when serialized task is small") {
Expand Down
Loading

0 comments on commit 1db9531

Please sign in to comment.