Skip to content

Commit

Permalink
Show skipped stages differently
Browse files Browse the repository at this point in the history
This requires storing more state in the listener and making sure
we clean them up later. Tests are coming in the next commit.
  • Loading branch information
Andrew Or committed May 15, 2015
1 parent 7cc34ce commit 7c4c364
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,24 @@
* limitations under the License.
*/

#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
}

#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
}

#dag-viz-graph svg g.node circle {
fill: #444;
#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

#dag-viz-graph svg g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

#dag-viz-graph svg g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}

#dag-viz-graph div#empty-dag-viz-message {
margin: 15px;
}
Expand All @@ -63,12 +50,24 @@
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
#dag-viz-graph svg.job g.cluster.skipped rect {
fill: #AAA;
stroke: #888;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage.skipped rect {
fill: #DDD;
stroke: #AAA;
stroke-width: 1px;
}

#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
Expand All @@ -77,6 +76,20 @@
fill: #333;
}

#dag-viz-graph svg.job g.cluster.skipped text {
fill: #666;
}

#dag-viz-graph svg.job g.node circle {
fill: #444;
}

#dag-viz-graph svg.job g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

/* Stage page specific styles */

#dag-viz-graph svg.stage g.cluster rect {
Expand All @@ -85,7 +98,7 @@
stroke-width: 1px;
}

#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
Expand All @@ -99,11 +112,14 @@
fill: #333;
}

#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
#dag-viz-graph svg.stage g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
#dag-viz-graph svg.stage g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}
45 changes: 28 additions & 17 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ var VizConstants = {
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_",
stageClusterPrefix: "cluster_stage_"
clusterPrefix: "cluster_"
};

var JobPageVizConstants = {
Expand Down Expand Up @@ -133,9 +132,7 @@ function renderDagViz(forJob) {
}

// Render
var svg = graphContainer()
.append("svg")
.attr("class", jobOrStage);
var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
Expand Down Expand Up @@ -183,24 +180,29 @@ function renderDagVizForJob(svgContainer) {
metadataContainer().selectAll(".stage-metadata").each(function(d, i) {
var metadata = d3.select(this);
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var stageId = metadata.attr("stage-id").replace(VizConstants.stagePrefix, "");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = "/stages/stage/?id=" +
stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
var isSkipped = metadata.attr("skipped") == "true";
var container;
if (isSkipped) {
container = svgContainer
.append("g")
.attr("id", containerId)
.attr("skipped", "true");
} else {
container = svgContainer
.append("a")
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
.attr("xlink:href", "/stages/stage/?id=" + stageId + "&attempt=0&expandDagViz=true")
.append("g")
.attr("id", containerId);
}

// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer
.selectAll("g.cluster")
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
Expand All @@ -213,6 +215,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);

// Mark elements as skipped if appropriate. Unfortunately we need to mark all
// elements instead of the parent container because of CSS override rules.
if (isSkipped) {
container.selectAll("g").classed("skipped", true);
}

// Round corners on rectangles
container
.selectAll("rect")
Expand Down Expand Up @@ -242,6 +250,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}

/* -------------------- *
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,11 @@ private[spark] object UIUtils extends Logging {
</a>
</span>
<div id="dag-viz-graph"></div>
<div id="dag-viz-metadata">
<div id="dag-viz-metadata" style="display:none">
{
graphs.map { g =>
<div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
val skipped = g.rootCluster.name.contains("skipped").toString
<div class="stage-metadata" stage-id={g.rootCluster.id} skipped={skipped}>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class RDDOperationCluster(val id: String, val name: String) {
private[ui] class RDDOperationCluster(val id: String, private var _name: String) {
private val _childNodes = new ListBuffer[RDDOperationNode]
private val _childClusters = new ListBuffer[RDDOperationCluster]

def name: String = _name
def setName(n: String): Unit = { _name = n }

def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
Expand Down Expand Up @@ -88,7 +91,7 @@ private[ui] object RDDOperationGraph extends Logging {
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID

// Root cluster is the stage cluster
val stageClusterId = s"stage_${stage.stageId}"
val stageClusterId = stage.stageId.toString
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {

// Note: the fate of jobs and stages are tied. This means when we clean up a job,
// we always clean up all of its stages. Similarly, when we clean up a stage, we
// always clean up its job (and, transitively, other stages in the same job).
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int]
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private[ui] val completedStageIds = new mutable.HashSet[Int]

// Keep track of the order in which these are inserted so we can remove old ones
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
Expand All @@ -40,16 +47,21 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
/**
* Return the graph metadata for all stages in the given job.
* An empty list is returned if one or more of its stages has been cleaned up.
*/
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
}
val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
val graphs = jobIdToStageIds.get(jobId)
.getOrElse(Seq.empty)
.flatMap { sid => stageIdToGraph.get(sid) }
// Mark any skipped stages as such
graphs
.filter { g => skippedStageIds.contains(g.rootCluster.id.toInt) }
.filter { g => !g.rootCluster.name.contains("skipped") }
.foreach { g => g.rootCluster.setName(g.rootCluster.name + " (skipped)") }
graphs
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
Expand All @@ -66,22 +78,54 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
// Remove state for old stages
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)

// Remove state for old stages if necessary
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.take(toRemove).foreach { id => cleanStage(id) }
stageIds.trimStart(toRemove)
}
}

// Remove state for old jobs
// Remove state for old jobs if necessary
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
jobIds.take(toRemove).foreach { id => cleanJob(id) }
jobIds.trimStart(toRemove)
}
}

/** Keep track of stages that have completed. */
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
completedStageIds += stageCompleted.stageInfo.stageId
}

/** On job end, find all stages in this job that are skipped and mark them as such. */
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobId = jobEnd.jobId
jobIdToStageIds.get(jobId).foreach { stageIds =>
val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) }
jobIdToSkippedStageIds(jobId) = skippedStageIds
}
}

/** Clean metadata for the given stage, its job, and all other stages that belong to the job. */
private def cleanStage(stageId: Int): Unit = {
completedStageIds.remove(stageId)
stageIdToGraph.remove(stageId)
stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) }
}

/** Clean metadata for the given job and all stages that belong to it. */
private def cleanJob(jobId: Int): Unit = {
jobIdToSkippedStageIds.remove(jobId)
jobIdToStageIds.remove(jobId).foreach { stageIds =>
stageIds.foreach { stageId => cleanStage(stageId) }
}
}

}

0 comments on commit 7c4c364

Please sign in to comment.