Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7627][SPARK-7472] DAG visualization: style skipped stages #6171

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,21 @@
* limitations under the License.
*/

#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

#dag-viz-graph svg g.node circle {
fill: #444;
#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
}

#dag-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
}

#dag-viz-graph svg g.node.cached circle {
fill: #444;
#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.node.cached rect {
fill: #B3F5C5;
stroke: #56F578;
#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

Expand All @@ -61,12 +50,23 @@
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
#dag-viz-graph svg.job g.cluster.skipped rect {
fill: #D6D6D6;
stroke: #B7B7B7;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage.skipped rect {
stroke: #ADADAD;
stroke-width: 1px;
}

#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
Expand All @@ -75,6 +75,20 @@
fill: #333;
}

#dag-viz-graph svg.job g.cluster.skipped text {
fill: #666;
}

#dag-viz-graph svg.job g.node circle {
fill: #444;
}

#dag-viz-graph svg.job g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

/* Stage page specific styles */

#dag-viz-graph svg.stage g.cluster rect {
Expand All @@ -83,7 +97,7 @@
stroke-width: 1px;
}

#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
Expand All @@ -97,11 +111,14 @@
fill: #333;
}

#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
#dag-viz-graph svg.stage g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
#dag-viz-graph svg.stage g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}
50 changes: 32 additions & 18 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_",
stageClusterPrefix: "cluster_stage_"
clusterPrefix: "cluster_"
};

var JobPageVizConstants = {
Expand Down Expand Up @@ -133,9 +131,7 @@ function renderDagViz(forJob) {
}

// Render
var svg = graphContainer()
.append("svg")
.attr("class", jobOrStage);
var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
Expand Down Expand Up @@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
.find("a")
.attr("href") + "&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
var isSkipped = metadata.attr("skipped") == "true";
var container;
if (isSkipped) {
container = svgContainer
.append("g")
.attr("id", containerId)
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
}

// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer
.selectAll("g.cluster")
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
Expand All @@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);

// Mark elements as skipped if appropriate. Unfortunately we need to mark all
// elements instead of the parent container because of CSS override rules.
if (isSkipped) {
container.selectAll("g").classed("skipped", true);
}

// Round corners on rectangles
container
.selectAll("rect")
Expand Down Expand Up @@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}

/* -------------------- *
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,12 @@ private[spark] object UIUtils extends Logging {
</a>
</span>
<div id="dag-viz-graph"></div>
<div id="dag-viz-metadata">
<div id="dag-viz-metadata" style="display:none">
{
graphs.map { g =>
<div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "")
val skipped = g.rootCluster.name.contains("skipped").toString
<div class="stage-metadata" stage-id={stageId} skipped={skipped}>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class RDDOperationCluster(val id: String, val name: String) {
private[ui] class RDDOperationCluster(val id: String, private var _name: String) {
private val _childNodes = new ListBuffer[RDDOperationNode]
private val _childClusters = new ListBuffer[RDDOperationCluster]

def name: String = _name
def setName(n: String): Unit = { _name = n }

def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
Expand All @@ -71,6 +74,8 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {

private[ui] object RDDOperationGraph extends Logging {

val STAGE_CLUSTER_PREFIX = "stage_"

/**
* Construct a RDDOperationGraph for a given stage.
*
Expand All @@ -88,7 +93,8 @@ private[ui] object RDDOperationGraph extends Logging {
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID

// Root cluster is the stage cluster
val stageClusterId = s"stage_${stage.stageId}"
// Use a special prefix here to differentiate this cluster from other operation clusters
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {

// Note: the fate of jobs and stages are tied. This means when we clean up a job,
// we always clean up all of its stages. Similarly, when we clean up a stage, we
// always clean up its job (and, transitively, other stages in the same job).
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int]
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private[ui] val completedStageIds = new mutable.HashSet[Int]

// Keep track of the order in which these are inserted so we can remove old ones
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
Expand All @@ -40,16 +47,23 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
/**
* Return the graph metadata for all stages in the given job.
* An empty list is returned if one or more of its stages has been cleaned up.
*/
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
val graphs = jobIdToStageIds.get(jobId)
.getOrElse(Seq.empty)
.flatMap { sid => stageIdToGraph.get(sid) }
// Mark any skipped stages as such
graphs.foreach { g =>
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
if (skippedStageIds.contains(stageId) && !g.rootCluster.name.contains("skipped")) {
g.rootCluster.setName(g.rootCluster.name + " (skipped)")
}
}
graphs
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
Expand All @@ -66,22 +80,68 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
// Remove state for old stages
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
trimStagesIfNecessary()
}

trimJobsIfNecessary()
}

/** Keep track of stages that have completed. */
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stageId = stageCompleted.stageInfo.stageId
if (stageIdToJobId.contains(stageId)) {
// Note: Only do this if the stage has not already been cleaned up
// Otherwise, we may never clean this stage from `completedStageIds`
completedStageIds += stageCompleted.stageInfo.stageId
}
}

/** On job end, find all stages in this job that are skipped and mark them as such. */
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobId = jobEnd.jobId
jobIdToStageIds.get(jobId).foreach { stageIds =>
val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) }
// Note: Only do this if the job has not already been cleaned up
// Otherwise, we may never clean this job from `jobIdToSkippedStageIds`
jobIdToSkippedStageIds(jobId) = skippedStageIds
}
}

/** Clean metadata for old stages if we have exceeded the number to retain. */
private def trimStagesIfNecessary(): Unit = {
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => cleanStage(id) }
stageIds.trimStart(toRemove)
}
}

// Remove state for old jobs
/** Clean metadata for old jobs if we have exceeded the number to retain. */
private def trimJobsIfNecessary(): Unit = {
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
jobIds.take(toRemove).foreach { id => cleanJob(id) }
jobIds.trimStart(toRemove)
}
}

/** Clean metadata for the given stage, its job, and all other stages that belong to the job. */
private[ui] def cleanStage(stageId: Int): Unit = {
completedStageIds.remove(stageId)
stageIdToGraph.remove(stageId)
stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) }
}

/** Clean metadata for the given job and all stages that belong to it. */
private[ui] def cleanJob(jobId: Int): Unit = {
jobIdToSkippedStageIds.remove(jobId)
jobIdToStageIds.remove(jobId).foreach { stageIds =>
stageIds.foreach { stageId => cleanStage(stageId) }
}
}

}
Loading