diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 733676546eab3..b969e41e4e55c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.streaming.ui +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 import javax.servlet.http.HttpServletRequest +import scala.collection.mutable import scala.xml.Node import org.apache.commons.text.StringEscapeUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.streaming.ui.UIUtils._ -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils private[ui] class StreamingQueryPage(parent: StreamingQueryTab) extends WebUIPage("") with Logging { @@ -35,11 +39,147 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent) } - def generateDataRow(request: HttpServletRequest, queryActive: Boolean) - (query: StreamingQueryUIData): Seq[Node] = { + private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { + val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus + .partition(_.isActive) + + val content = mutable.ListBuffer[Node]() + // show active queries table only if there is at least one active query + if (activeQueries.nonEmpty) { + // scalastyle:off + content ++= + +
+ + Active Streaming Queries ({activeQueries.length}) +
+
++ +
+ +
+ // scalastyle:on + } + // show active queries table only if there is at least one completed query + if (inactiveQueries.nonEmpty) { + // scalastyle:off + content ++= + +
+ + Completed Streaming Queries ({inactiveQueries.length}) +
+
++ +
+ +
+ // scalastyle:on + } + content + } + + private def queryTable(data: Seq[StreamingQueryUIData], request: HttpServletRequest, + tableTag: String): Seq[Node] = { + + val isActive = if (tableTag.contains("active")) true else false + val page = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) + + try { + new StreamingQueryPagedTable( + request, + parent, + data, + tableTag, + isActive, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "StreamingQuery" + ).table(page) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering execution table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } +} + +class StreamingQueryPagedTable( + request: HttpServletRequest, + parent: StreamingQueryTab, + data: Seq[StreamingQueryUIData], + tableTag: String, + isActive: Boolean, + basePath: String, + subPath: String) extends PagedTable[StructuredStreamingRow] { + + private val (sortColumn, sortDesc, pageSize) = getTableParameters(request, tableTag, "Start Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + override def tableId: String = s"$tableTag-table" + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageSizeFormField: String = s"$tableTag.pageSize" + + override def pageNumberFormField: String = s"$tableTag.page" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$sortDesc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" + } + + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$sortDesc#$tableTag" + + override def dataSource: PagedDataSource[StructuredStreamingRow] = + new StreamingQueryDataSource(data, sortColumn, sortDesc, pageSize, isActive) + + override def headers: Seq[Node] = { + val headerAndCss: Seq[(String, Boolean, Option[String])] = { + Seq( + ("Name", true, None), + ("Status", false, None), + ("ID", true, None), + ("Run ID", true, None), + ("Start Time", true, None), + ("Duration", true, None), + ("Avg Input /sec", true, None), + ("Avg Process /sec", true, None), + ("Latest Batch", true, None)) ++ { + if (!isActive) { + Seq(("Error", false, None)) + } else { + Nil + } + } + } + isSortColumnValid(headerAndCss, sortColumn) + + headerRow(headerAndCss, sortDesc, pageSize, sortColumn, parameterPath, tableTag, tableTag) + } + + override def row(query: StructuredStreamingRow): Seq[Node] = { + val streamingQuery = query.streamingUIData + val statisticsLink = "%s/%s/statistics?id=%s" + .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, + streamingQuery.runId) def details(detail: Any): Seq[Node] = { - if (queryActive) { + if (isActive) { return Seq.empty[Node] } val detailString = detail.asInstanceOf[String] @@ -51,12 +191,39 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) {summary}{details} } - val statisticsLink = "%s/%s/statistics?id=%s" - .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId) + + {UIUtils.getQueryName(streamingQuery)} + {UIUtils.getQueryStatus(streamingQuery)} + {streamingQuery.id} + {streamingQuery.runId} + {SparkUIUtils.formatDate(streamingQuery.startTimestamp)} + {query.duration} + {withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")} + {withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")} + {withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")} + {details(streamingQuery.exception.getOrElse("-"))} + + } +} - val name = UIUtils.getQueryName(query) - val status = UIUtils.getQueryStatus(query) - val duration = if (queryActive) { +case class StructuredStreamingRow( + duration: String, + avgInput: Double, + avgProcess: Double, + streamingUIData: StreamingQueryUIData) + +class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: String, desc: Boolean, + pageSize: Int, isActive: Boolean) extends PagedDataSource[StructuredStreamingRow](pageSize) { + + // convert StreamingQueryUIData to StreamingRow to provide required data for sorting and sort it + private val data = uiData.map(streamingRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to) + + private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = { + val duration = if (isActive) { SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp) } else { withNoProgress(query, { @@ -65,79 +232,31 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) }, "-") } - - {name} - {status} - {query.id} - {query.runId} - {SparkUIUtils.formatDate(query.startTimestamp)} - {duration} - {withNoProgress(query, { - (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / - query.recentProgress.length).formatted("%.2f") }, "NaN")} - - {withNoProgress(query, { - (query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum / - query.recentProgress.length).formatted("%.2f") }, "NaN")} - - {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} - {details(query.exception.getOrElse("-"))} - - } + val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / + query.recentProgress.length) - private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { - val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus - .partition(_.isActive) - val activeQueryTables = if (activeQueries.nonEmpty) { - val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch") - - Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true), - activeQueries, true, Some("activeQueries-table"), Seq(null), false)) - } else { - None - } + val avgProcess = (query.recentProgress.map(p => + withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length) - val inactiveQueryTables = if (inactiveQueries.nonEmpty) { - val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch", "Error") + StructuredStreamingRow(duration, avgInput, avgProcess, query) + } - Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false), - inactiveQueries, true, Some("completedQueries-table"), Seq(null), false)) + private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = { + val ordering: Ordering[StructuredStreamingRow] = sortColumn match { + case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData)) + case "ID" => Ordering.by(_.streamingUIData.id) + case "Run ID" => Ordering.by(_.streamingUIData.runId) + case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp) + case "Duration" => Ordering.by(_.duration) + case "Avg Input /sec" => Ordering.by(_.avgInput) + case "Avg Process /sec" => Ordering.by(_.avgProcess) + case "Latest Batch" => Ordering.by(_.streamingUIData.lastProgress.batchId) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse } else { - None + ordering } - - // scalastyle:off - val content = - -
- - Active Streaming Queries ({activeQueries.length}) -
-
++ -
- -
++ - -
- - Completed Streaming Queries ({inactiveQueries.length}) -
-
++ -
- -
- // scalastyle:on - - content } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index 2a1e18ab66bb7..640c21c52a146 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -43,13 +43,11 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { var html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) assert(html.contains("active streaming queries (1)")) - assert(html.contains("completed streaming queries (0)")) when(streamQuery.isActive).thenReturn(false) when(streamQuery.exception).thenReturn(None) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) - assert(html.contains("active streaming queries (0)")) assert(html.contains("completed streaming queries (1)")) assert(html.contains("finished")) @@ -57,7 +55,6 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(streamQuery.exception).thenReturn(Option("exception in query")) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) - assert(html.contains("active streaming queries (0)")) assert(html.contains("completed streaming queries (1)")) assert(html.contains("failed")) assert(html.contains("exception in query")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index fdf4c6634d79f..63b5792ebd515 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -91,21 +91,23 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B goToUi(spark, "/StreamingQuery") findAll(cssSelector("h3")).map(_.text).toSeq should contain("Streaming Query") - findAll(cssSelector("""#activeQueries-table th""")).map(_.text).toSeq should be { - List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch") + + val arrow = 0x25BE.toChar + findAll(cssSelector("""#active-table th""")).map(_.text).toList should be { + List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", + "Avg Input /sec", "Avg Process /sec", "Latest Batch") } val activeQueries = - findAll(cssSelector("""#activeQueries-table td""")).map(_.text).toSeq + findAll(cssSelector("""#active-table td""")).map(_.text).toSeq activeQueries should contain(activeQuery.id.toString) activeQueries should contain(activeQuery.runId.toString) - findAll(cssSelector("""#completedQueries-table th""")) - .map(_.text).toSeq should be { - List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch", "Error") + findAll(cssSelector("""#completed-table th""")) + .map(_.text).toList should be { + List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", + "Avg Input /sec", "Avg Process /sec", "Latest Batch", "Error") } val completedQueries = - findAll(cssSelector("""#completedQueries-table td""")).map(_.text).toSeq + findAll(cssSelector("""#completed-table td""")).map(_.text).toSeq completedQueries should contain(completedQuery.id.toString) completedQueries should contain(completedQuery.runId.toString) completedQueries should contain(failedQuery.id.toString) @@ -113,7 +115,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B // Check the query statistics page val activeQueryLink = - findAll(cssSelector("""#activeQueries-table a""")).flatMap(_.attribute("href")).next + findAll(cssSelector("""#active-table td a""")).flatMap(_.attribute("href")).next go to activeQueryLink findAll(cssSelector("h3"))