")
.replace(/"/g, "\"");
var g = graphlibDot.read(escaped_dot);
- g.graph().rankSep = rankSep;
var renderer = new dagreD3.render();
+ preprocessGraphLayout(g, forJob);
renderer(container, g);
}
@@ -251,50 +249,38 @@ function graphContainer() { return d3.select("#dag-viz-graph"); }
function metadataContainer() { return d3.select("#dag-viz-metadata"); }
/*
- * Helper function to create draw a label for each cluster.
- *
- * We need to do this manually because dagre-d3 does not support labeling clusters.
- * In general, the clustering support for dagre-d3 is quite limited at this point.
+ * Helper function to pre-process the graph layout.
+ * This step is necessary for certain styles that affect the positioning
+ * and sizes of graph elements, e.g. padding, font style, shape.
*/
-function drawClusterLabels(svgContainer, forJob) {
- var clusterLabelSize, stageClusterLabelSize;
+function preprocessGraphLayout(g, forJob) {
+ var nodes = g.nodes();
+ for (var i = 0; i < nodes.length; i++) {
+ var isCluster = g.children(nodes[i]).length > 0;
+ if (!isCluster) {
+ var node = g.node(nodes[i]);
+ if (forJob) {
+ // Do not display RDD name on job page
+ node.shape = "circle";
+ node.labelStyle = "font-size: 0px";
+ } else {
+ node.labelStyle = "font-size: 12px";
+ }
+ node.padding = "5";
+ }
+ }
+ // Curve the edges
+ var edges = g.edges();
+ for (var j = 0; j < edges.length; j++) {
+ var edge = g.edge(edges[j]);
+ edge.lineInterpolate = "basis";
+ }
+ // Adjust vertical separation between nodes
if (forJob) {
- clusterLabelSize = JobPageVizConstants.clusterLabelSize;
- stageClusterLabelSize = JobPageVizConstants.stageClusterLabelSize;
+ g.graph().rankSep = JobPageVizConstants.rankSep;
} else {
- clusterLabelSize = StagePageVizConstants.clusterLabelSize;
- stageClusterLabelSize = StagePageVizConstants.stageClusterLabelSize;
+ g.graph().rankSep = StagePageVizConstants.rankSep;
}
- svgContainer.selectAll("g.cluster").each(function() {
- var cluster = d3.select(this);
- var isStage = cluster.attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
- var labelSize = isStage ? stageClusterLabelSize : clusterLabelSize;
- drawClusterLabel(cluster, labelSize);
- });
-}
-
-/*
- * Helper function to draw a label for the given cluster element based on its name.
- *
- * In the process, we need to expand the bounding box to make room for the label.
- * We need to do this because dagre-d3 did not take this into account when it first
- * rendered the bounding boxes. Note that this means we need to adjust the view box
- * of the SVG afterwards since we shifted a few boxes around.
- */
-function drawClusterLabel(d3cluster, fontSize) {
- var cluster = d3cluster;
- var rect = d3cluster.select("rect");
- rect.attr("y", toFloat(rect.attr("y")) - fontSize);
- rect.attr("height", toFloat(rect.attr("height")) + fontSize);
- var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - fontSize / 2;
- var labelY = toFloat(rect.attr("y")) + fontSize * 1.5;
- var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
- cluster.append("text")
- .attr("x", labelX)
- .attr("y", labelY)
- .attr("text-anchor", "end")
- .style("font-size", fontSize + "px")
- .text(labelText);
}
/*
@@ -444,7 +430,7 @@ function addTooltipsForRDDs(svgContainer) {
if (tooltipText) {
node.select("circle")
.attr("data-toggle", "tooltip")
- .attr("data-placement", "right")
+ .attr("data-placement", "bottom")
.attr("title", tooltipText)
}
});
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index acbaba6791850..efb6b93cfc35d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -31,7 +31,7 @@ private[spark] object PythonUtils {
def sparkPythonPath: String = {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
- pythonPath += Seq(sparkHome, "python").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
@@ -53,4 +53,11 @@ private[spark] object PythonUtils {
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
}
+
+ /**
+ * Convert java map of K, V into Map of K, V (for calling API with varargs)
+ */
+ def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
+ jm.toMap
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index b563034457a91..7fa75ac8c2b54 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}
+import scala.collection.JavaConversions._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
-
-import scala.collection.JavaConversions._
-import scala.concurrent.duration._
-import scala.language.postfixOps
+import org.apache.spark.{Logging, SparkConf, SparkException}
/**
* :: DeveloperApi ::
@@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
- def recurse(path: Path): Array[FileStatus] = {
- val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
- leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
+ listLeafStatuses(fs, fs.getFileStatus(basePath))
+ }
+
+ /**
+ * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
+ * given path points to a file, return a single-element collection containing [[FileStatus]] of
+ * that file.
+ */
+ def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
+ def recurse(status: FileStatus): Seq[FileStatus] = {
+ val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
+ leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}
- val baseStatus = fs.getFileStatus(basePath)
- if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
+ if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
+ }
+
+ def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
+ listLeafDirStatuses(fs, fs.getFileStatus(basePath))
+ }
+
+ def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
+ def recurse(status: FileStatus): Seq[FileStatus] = {
+ val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
+ leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
+ }
+
+ assert(baseStatus.isDir)
+ recurse(baseStatus)
+ }
+
+ def globPath(pattern: Path): Seq[Path] = {
+ val fs = pattern.getFileSystem(conf)
+ Option(fs.globStatus(pattern)).map { statuses =>
+ statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
+ }.getOrElse(Seq.empty[Path])
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7dad30ecbdd2f..02a94baf372d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1523,13 +1523,15 @@ abstract class RDD[T: ClassTag](
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
- private[spark] def doCheckpoint() {
- if (!doCheckpointCalled) {
- doCheckpointCalled = true
- if (checkpointData.isDefined) {
- checkpointData.get.doCheckpoint()
- } else {
- dependencies.foreach(_.rdd.doCheckpoint())
+ private[spark] def doCheckpoint(): Unit = {
+ RDDOperationScope.withScope(sc, "checkpoint", false, true) {
+ if (!doCheckpointCalled) {
+ doCheckpointCalled = true
+ if (checkpointData.isDefined) {
+ checkpointData.get.doCheckpoint()
+ } else {
+ dependencies.foreach(_.rdd.doCheckpoint())
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index dfd6fdb5e9993..06e616220c706 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -78,6 +78,9 @@ private[spark] object JettyUtils extends Logging {
} catch {
case e: IllegalArgumentException =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
+ case e: Exception =>
+ logWarning(s"GET ${request.getRequestURI} failed: $e", e)
+ throw e
}
}
// SPARK-5983 ensure TRACE is not supported
@@ -217,6 +220,9 @@ private[spark] object JettyUtils extends Logging {
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
+ val errorHandler = new ErrorHandler()
+ errorHandler.setShowStacks(true)
+ server.addBean(errorHandler)
server.setHandler(collection)
try {
server.start()
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 6a0f5c5d16daa..ad16becde85dd 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -194,11 +194,7 @@ private[spark] object UIUtils extends Logging {
{tab.name}
}
- val helpButton: Seq[Node] = helpText.map { helpText =>
-
- (?)
-
- }.getOrElse(Seq.empty)
+ val helpButton: Seq[Node] = helpText.map(tooltip(_, "bottom")).getOrElse(Seq.empty)
@@ -360,7 +356,7 @@ private[spark] object UIUtils extends Logging {
{
graphs.map { g =>
}
+ def tooltip(text: String, position: String): Seq[Node] = {
+
+ (?)
+
+ }
+
/** Return a script element that automatically expands the DAG visualization on page load. */
def expandDagVizOnLoad(forJob: Boolean): Seq[Node] = {