Skip to content

Commit

Permalink
Merge branch 'master' into pickle
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 13, 2014
2 parents c77c87b + 885d162 commit 3908f5c
Show file tree
Hide file tree
Showing 60 changed files with 1,015 additions and 548 deletions.
2 changes: 0 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.storage.StorageLevel

import scala.language.postfixOps

class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable

Expand Down
1 change: 1 addition & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
"classes ahead of assembly." >&2
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
Expand Down
27 changes: 27 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,33 @@
</execution>
</executions>
</plugin>
<!--
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
the runtime classpath.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>guava</includeArtifactIds>
<silent>true</silent>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<resources>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {

// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Option(System.getenv("SPARK_USER")).getOrElse(""))
Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparkUI(this))
} else {
// For tests, do not enable the UI
None
}
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
Expand Down Expand Up @@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.{Modifier, InvocationTargetException}
import java.net.URL

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -323,7 +323,9 @@ object SparkSubmit {
}

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,16 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
tasksMetrics += ((taskRunner.taskId, metrics))
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
// the changes of metrics any more, so make a deep copy of it
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
} else {
// It will be copied by serialization
tasksMetrics += ((taskRunner.taskId, metrics))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler.cluster

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.spark.{Logging, SparkContext, SparkEnv}
Expand Down Expand Up @@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(

val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")

logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
logInfo("Writing Spark UI Address: " + appUIAddress)

// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
temp.writeUTF(sc.ui.appUIAddress)
temp.writeUTF(appUIAddress)
temp.close()

// "Atomic" rename
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
appUIAddress, eventLogDir)

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.lang.ref.WeakReference

import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.language.existentials
import scala.language.postfixOps
import scala.util.Random

import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down
2 changes: 0 additions & 2 deletions core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.util.Utils

import scala.language.postfixOps

class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.Semaphore
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps

import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.Timeouts
Expand Down
45 changes: 31 additions & 14 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.ServerSocket
import javax.servlet.http.HttpServletRequest

import scala.io.Source
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

import org.eclipse.jetty.server.Server
Expand All @@ -36,11 +35,25 @@ import scala.xml.Node

class UISuite extends FunSuite {

/**
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
}

ignore("basic ui visibility") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
Expand All @@ -51,7 +64,7 @@ class UISuite extends FunSuite {
}

ignore("visibility at localhost:4040") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
Expand All @@ -61,8 +74,8 @@ class UISuite extends FunSuite {
}

ignore("attaching a new tab") {
withSpark(new SparkContext("local", "test")) { sc =>
val sparkUI = sc.ui
withSpark(newSparkContext()) { sc =>
val sparkUI = sc.ui.get

val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
Expand All @@ -73,7 +86,7 @@ class UISuite extends FunSuite {
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sparkUI.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))

// check whether new page exists
Expand All @@ -87,7 +100,7 @@ class UISuite extends FunSuite {
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
// check whether new page exists
assert(html.contains("magic"))
}
Expand Down Expand Up @@ -129,16 +142,20 @@ class UISuite extends FunSuite {
}

test("verify appUIAddress contains the scheme") {
withSpark(new SparkContext("local", "test")) { sc =>
val uiAddress = sc.ui.appUIAddress
assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val uiAddress = ui.appUIAddress
val uiHostPort = ui.appUIHostPort
assert(uiAddress.equals("http://" + uiHostPort))
}
}

test("verify appUIAddress contains the port") {
withSpark(new SparkContext("local", "test")) { sc =>
val splitUIAddress = sc.ui.appUIAddress.split(':')
assert(splitUIAddress(2).toInt == sc.ui.boundPort)
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val splitUIAddress = ui.appUIAddress.split(':')
val boundPort = ui.boundPort
assert(splitUIAddress(2).toInt == boundPort)
}
}
}
1 change: 1 addition & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ For example:
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10

Expand Down
45 changes: 29 additions & 16 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,43 @@ object PageRank extends Logging {
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
// Initialize the pagerankGraph with each edge attribute having
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[Double, Double] = graph
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
.cache()
.mapVertices( (id, attr) => resetProb )

// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
rankGraph.cache()

// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.mapReduceTriplets[Double](
e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)

// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
logInfo(s"PageRank finished iteration $iteration.")
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)

iteration += 1
}

rankGraph
}

/**
Expand Down
Loading

0 comments on commit 3908f5c

Please sign in to comment.