Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
  • Loading branch information
Andrew Or committed May 15, 2015
2 parents b928cd4 + 8e3822a commit 51c95b9
Show file tree
Hide file tree
Showing 48 changed files with 631 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_"
};

Expand Down Expand Up @@ -180,7 +179,7 @@ function renderDagVizForJob(svgContainer) {
metadataContainer().selectAll(".stage-metadata").each(function(d, i) {
var metadata = d3.select(this);
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id").replace(VizConstants.stagePrefix, "");
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
var isSkipped = metadata.attr("skipped") == "true";
var container;
Expand All @@ -190,10 +189,14 @@ function renderDagVizForJob(svgContainer) {
.attr("id", containerId)
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var attemptId = 0
var stageLink = $("#stage-" + stageId + "-" + attemptId)
.find("a")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
.attr("xlink:href", "/stages/stage/?id=" + stageId + "&attempt=0&expandDagViz=true")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
throw new SparkException("An application name must be set in your configuration")
}

// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
// yarn-standalone is deprecated, but still supported
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
!_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}

if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ private[worker] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)

case Heartbeat =>
logInfo(s"Received heartbeat from driver ${sender.path}")

case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] abstract class WebUI(
}

/** Detach a handler from this UI. */
protected def detachHandler(handler: ServletContextHandler) {
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach { info =>
info.rootHandler.removeHandler(handler)
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
accessedFields.foreach { f => logDebug(" " + f) }

val inInterpreter = {
try {
val interpClass = Class.forName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => true
}
}

// List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
Expand All @@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging {
// required fields from the original object. We need the parent here because the Java
// language specification requires the first constructor parameter of any closure to be
// its enclosing object.
val clone = instantiateClass(cls, parent, inInterpreter)
val clone = instantiateClass(cls, parent)
for (fieldName <- accessedFields(cls)) {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
Expand Down Expand Up @@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging {

private def instantiateClass(
cls: Class[_],
enclosingObject: AnyRef,
inInterpreter: Boolean): AnyRef = {
if (!inInterpreter) {
enclosingObject: AnyRef): AnyRef = {
if (!Utils.isInInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0)
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging {
}
}

lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
// Returning true seems to be a mistake.
// Currently changing it to false causes tests failures in Streaming.
// For a more detailed discussion, please, refer to
// https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments.
// Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527
case _: ClassNotFoundException => true
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -96,6 +97,13 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

// Test that GC causes RDD cleanup after dereferencing the RDD
// Note rdd is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
rdd = null // Make RDD out of scope
runGC()
Expand All @@ -181,9 +182,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
rdd.count() // Defeat early collection by the JVM

// Test that GC causes shuffle cleanup after dereferencing the RDD
rdd.count() // Defeat any early collection of rdd variable by the JVM
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
runGC()
Expand All @@ -201,6 +202,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
// Note broadcast is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
broadcast = null // Make broadcast variable out of scope
runGC()
Expand All @@ -226,7 +228,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {

// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
rdd = null // Make RDD out of scope, ok if collected earlier
runGC()
postGCTester.assertCleanup()
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
Expand All @@ -245,6 +247,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
// Confirm the checkpoint directory exists
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))

// Reference rdd to defeat any early collection by the JVM
rdd.count()

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
Expand Down Expand Up @@ -352,6 +357,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
rdd.count() // Defeat early collection by the JVM

// Test that GC causes shuffle cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable {
* Returns a list containing weights, mean and covariance of each mixture component.
*/
def trainGaussianMixture(
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
maxIterations: Int,
seed: java.lang.Long): JList[Object] = {
seed: java.lang.Long,
initialModelWeights: java.util.ArrayList[Double],
initialModelMu: java.util.ArrayList[Vector],
initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)

if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
}
val initialModel = new GaussianMixtureModel(
initialModelWeights.asScala.toArray, gaussians.toArray)
gmmAlg.setInitialModel(initialModel)
}

if (seed != null) gmmAlg.setSeed(seed)

try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
var wt = ArrayBuffer.empty[Double]
var mu = ArrayBuffer.empty[Vector]
var mu = ArrayBuffer.empty[Vector]
var sigma = ArrayBuffer.empty[Matrix]
for (i <- 0 until model.k) {
wt += model.weights(i)
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
}
}
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is
* the weight for Gaussian i, and weight.sum == 1
* @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i
* @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the
* covariance matrix for Gaussian i
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
*/
@Experimental
class GaussianMixtureModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ class DenseMatrix(

override def copy: DenseMatrix = new DenseMatrix(numRows, numCols, values.clone())

private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f))
private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f),
isTransposed)

private[mllib] def update(f: Double => Double): DenseMatrix = {
val len = values.length
Expand Down Expand Up @@ -535,7 +536,7 @@ class SparseMatrix(
}

private[mllib] def map(f: Double => Double) =
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f))
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f), isTransposed)

private[mllib] def update(f: Double => Double): SparseMatrix = {
val len = values.length
Expand Down
Loading

0 comments on commit 51c95b9

Please sign in to comment.