From 4255c094a72699bdc08ce2f8ed09ba0401927d5c Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 17 Jun 2016 00:50:25 +0800 Subject: [PATCH] paginate stage table in stages tab --- .../apache/spark/ui/jobs/AllStagesPage.scala | 21 +- .../org/apache/spark/ui/jobs/JobPage.scala | 18 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 6 +- .../org/apache/spark/ui/jobs/StageTable.scala | 503 ++++++++++++++---- 4 files changed, 409 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e75f1c57a69d0..c95eedb7831fa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -38,22 +38,19 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages - val now = System.currentTimeMillis val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) val pendingStagesTable = - new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, true) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) @@ -135,4 +132,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { UIUtils.headerSparkPage("Stages for All Jobs", content, parent) } } -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 99f2bd8bc1f22..ca38d88e2a954 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -230,19 +230,17 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, parent.killEnabled, false) val pendingOrSkippedStagesTable = - new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, true) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 6cd25919ca5fd..2ccdf359fedbb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -42,9 +42,9 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } - val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + val activeStagesTable = + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getPoolForName(poolName).getOrElse { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 0e020155a6564..4fcfd460e9ab6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,61 +17,311 @@ package org.apache.spark.ui.jobs +import java.net.URLEncoder import java.util.Date +import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Text} +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.scheduler.StageInfo -import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui._ +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils -/** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTableBase( + request: HttpServletRequest, stages: Seq[StageInfo], + stageTag: String, + basePath: String, + progressListener: JobProgressListener, + isFairScheduler: Boolean, + killEnabled: Boolean, + isFailedStage: Boolean) { + val parameterStagePage = request.getParameter(stageTag + ".page") + val parameterStageSortColumn = request.getParameter(stageTag + ".sort") + val parameterStageSortDesc = request.getParameter(stageTag + ".desc") + val parameterStagePageSize = request.getParameter(stageTag + ".pageSize") + val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize") + + val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1) + val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Stage Id") + val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( + // New stages should be shown above old jobs by default. + if (stageSortColumn == "Stage Id") true else false + ) + val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) + val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) + .getOrElse(stagePageSize) + + val page: Int = { + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + if (stagePageSize <= stagePrevPageSize) { + stagePage + } else { + 1 + } + } + val currentTime = System.currentTimeMillis() + + val toNodeSeq = try { + new StagePagedTable( + stages, + stageTag, + basePath, + progressListener, + isFairScheduler, + killEnabled, + currentTime, + stagePageSize, + stageSortColumn, + stageSortDesc, + isFailedStage + ).table(page) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => +
+

Error while rendering stage table:

+
+          {Utils.exceptionString(e)}
+        
+
+ } +} + +private[ui] class StageTableRowData( + val stageInfo: StageInfo, + val stageData: Option[StageUIData], + val stageId: Int, + val attemptId: Int, + val schedulingPool: String, + val description: String, + val descriptionOption: Option[String], + val submissionTime: Long, + val formattedSubmissionTime: String, + val duration: Long, + val formattedDuration: String, + val inputRead: Long, + val inputReadWithUnit: String, + val outputWrite: Long, + val outputWriteWithUnit: String, + val shuffleRead: Long, + val shuffleReadWithUnit: String, + val shuffleWrite: Long, + val shuffleWriteWithUnit: String) + +private[ui] class MissingStageTableRowData( + stageInfo: StageInfo, + stageId: Int, + attemptId: Int) extends StageTableRowData( + stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") + +/** Page showing list of all ongoing and recently finished stages */ +private[ui] class StagePagedTable( + stages: Seq[StageInfo], + stageTag: String, basePath: String, listener: JobProgressListener, isFairScheduler: Boolean, - killEnabled: Boolean) { - - protected def columns: Seq[Node] = { - Stage Id ++ - {if (isFairScheduler) {Pool Name} else Seq.empty} ++ - Description - Submitted - Duration - Tasks: Succeeded/Total - Input - Output - Shuffle Read - - - - Shuffle Write - - + killEnabled: Boolean, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean, + isFailedStage: Boolean) extends PagedTable[StageTableRowData] { + + override def tableId: String = stageTag + "-table" + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped table-head-clickable" + + override def pageSizeFormField: String = stageTag + ".pageSize" + + override def prevPageSizeFormField: String = stageTag + ".prevPageSize" + + override def pageNumberFormField: String = stageTag + ".page" + + override val dataSource = new StageDataSource( + stages, + listener, + basePath, + currentTime, + pageSize, + sortColumn, + desc + ) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + basePath + + s"?$pageNumberFormField=$page" + + s"&$stageTag.sort=$encodedSortColumn" + + s"&$stageTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + } + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"$basePath?$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" + } + + override def headers: Seq[Node] = { + val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] = + Seq(("Stage Id", None, true)) ++ + {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++ + Seq( + ("Description", None, true), ("Submitted", None, true), ("Duration", None, true), + ("Tasks: Succeeded/Total", None, false), + ("Input", Some((ToolTips.INPUT, false)), true), + ("Output", Some((ToolTips.OUTPUT, false)), true), + ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true), + ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true) + ) ++ + {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty} + + if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + + val headerRow: Seq[Node] = { + stageHeadersAndCssClasses.map { case (header, tooltip, sortable) => + val headerSpan = tooltip.map { case (title, left) => + if (left) { + /* Place the shuffle write tooltip on the left (rather than the default position + of on top) because the shuffle write column is the last column on the right side and + the tooltip is wider than the column, so it doesn't fit on top. */ + + {header} + + } else { + + {header} + + } + }.getOrElse( + {header} + ) + + if (header == sortColumn) { + val headerLink = Unparsed( + basePath + + s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.desc=${!desc}" + + s"&$stageTag.pageSize=$pageSize") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + {headerSpan} +  {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + basePath + + s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.pageSize=$pageSize") + + + + {headerSpan} + + + } else { + + {headerSpan} + + } + } + } + } + {headerRow} + } + + override def row(data: StageTableRowData): Seq[Node] = { + + {rowContent(data)} + } - def toNodeSeq: Seq[Node] = { - listener.synchronized { - stageTable(renderStageRow, stages) + private def rowContent(data: StageTableRowData): Seq[Node] = { + data.stageData match { + case None => missingStageRow(data.stageId) + case Some(stageData) => + val info = data.stageInfo + + {if (data.attemptId > 0) { + {data.stageId} (retry {data.attemptId}) + } else { + {data.stageId} + }} ++ + {if (isFairScheduler) { + + + {data.schedulingPool} + + + } else { + Seq.empty + }} ++ + {makeDescription(info, data.descriptionOption)} + + {data.formattedSubmissionTime} + + {data.formattedDuration} + + {UIUtils.makeProgressBar(started = stageData.numActiveTasks, + completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, + skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} + + {data.inputReadWithUnit} + {data.outputWriteWithUnit} + {data.shuffleReadWithUnit} + {data.shuffleWriteWithUnit} ++ + { + if (isFailedStage) { + failureReasonHtml(info) + } else { + Seq.empty + } + } } } - /** Special table that merges two header cells. */ - protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { - - {columns} - - {rows.map(r => makeRow(r))} - -
+ private def failureReasonHtml(s: StageInfo): Seq[Node] = { + val failureReason = s.failureReason.getOrElse("") + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val details = if (isMultiline) { + // scalastyle:off + + +details + ++ + + // scalastyle:on + } else { + "" + } + {failureReasonSummary}{details} } - private def makeDescription(s: StageInfo): Seq[Node] = { + private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = { val basePathUri = UIUtils.prependBaseUri(basePath) val killLink = if (killEnabled) { @@ -111,12 +361,7 @@ private[ui] class StageTableBase( } - val stageDesc = for { - stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) - desc <- stageData.description - } yield { - UIUtils.makeDescription(desc, basePathUri) - } + val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri))
{stageDesc.getOrElse("")} {killLink} {nameLink} {details}
} @@ -132,19 +377,45 @@ private[ui] class StageTableBase( ++ // Shuffle Read // Shuffle Write } +} + +private[ui] class StageDataSource( + stages: Seq[StageInfo], + listener: JobProgressListener, + basePath: String, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) { + // Convert StageInfo to StageTableRowData which contains the final contents to show in the table + // so that we can avoid creating duplicate contents during sorting the data + private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc)) + + private var _slicedStageIds: Set[Int] = null - protected def stageRow(s: StageInfo): Seq[Node] = { + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = { + val r = data.slice(from, to) + _slicedStageIds = r.map(_.stageId).toSet + r + } + + private def stageRow(s: StageInfo): StageTableRowData = { val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) + if (stageDataOption.isEmpty) { - return missingStageRow(s.stageId) + return new MissingStageTableRowData(s, s.stageId, s.attemptId) } - val stageData = stageDataOption.get - val submissionTime = s.submissionTime match { + + val description = stageData.description + + val formattedSubmissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" } - val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) + val finishTime = s.completionTime.getOrElse(currentTime) // The submission time for a stage is misleading because it counts the time // the stage waits to be launched. (SPARK-10930) @@ -156,7 +427,7 @@ private[ui] class StageTableBase( if (finishTime > startTime) { Some(finishTime - startTime) } else { - Some(System.currentTimeMillis() - startTime) + Some(currentTime - startTime) } } else { None @@ -172,76 +443,80 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - {if (s.attemptId > 0) { - {s.stageId} (retry {s.attemptId}) - } else { - {s.stageId} - }} ++ - {if (isFairScheduler) { - - - {stageData.schedulingPool} - - - } else { - Seq.empty - }} ++ - {makeDescription(s)} - - {submissionTime} - - {formattedDuration} - - {UIUtils.makeProgressBar(started = stageData.numActiveTasks, - completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)} - - {inputReadWithUnit} - {outputWriteWithUnit} - {shuffleReadWithUnit} - {shuffleWriteWithUnit} - } - /** Render an HTML row that represents a stage */ - private def renderStageRow(s: StageInfo): Seq[Node] = - {stageRow(s)} -} - -private[ui] class FailedStageTable( - stages: Seq[StageInfo], - basePath: String, - listener: JobProgressListener, - isFairScheduler: Boolean) - extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) { - - override protected def columns: Seq[Node] = super.columns ++ Failure Reason + new StageTableRowData( + s, + stageDataOption, + s.stageId, + s.attemptId, + stageData.schedulingPool, + description.getOrElse(""), + description, + s.submissionTime.getOrElse(0), + formattedSubmissionTime, + duration.getOrElse(-1), + formattedDuration, + inputRead, + inputReadWithUnit, + outputWrite, + outputWriteWithUnit, + shuffleRead, + shuffleReadWithUnit, + shuffleWrite, + shuffleWriteWithUnit + ) + } - override protected def stageRow(s: StageInfo): Seq[Node] = { - val basicColumns = super.stageRow(s) - val failureReason = s.failureReason.getOrElse("") - val isMultiline = failureReason.indexOf('\n') >= 0 - // Display the first line by default - val failureReasonSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - failureReason.substring(0, failureReason.indexOf('\n')) - } else { - failureReason - }) - val details = if (isMultiline) { - // scalastyle:off - - +details - ++ - - // scalastyle:on + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = { + val ordering = sortColumn match { + case "Stage Id" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Int.compare(x.stageId, y.stageId) + } + case "Pool Name" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.schedulingPool, y.schedulingPool) + } + case "Description" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.description, y.description) + } + case "Submitted" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.submissionTime, y.submissionTime) + } + case "Duration" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.duration, y.duration) + } + case "Input" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.inputRead, y.inputRead) + } + case "Output" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.outputWrite, y.outputWrite) + } + case "Shuffle Read" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleRead, y.shuffleRead) + } + case "Shuffle Write" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite) + } + case "Tasks: Succeeded/Total" => + throw new IllegalArgumentException(s"Unsortable column: $sortColumn") + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse } else { - "" + ordering } - val failureReasonHtml = {failureReasonSummary}{details} - basicColumns ++ failureReasonHtml } } +