[SPARK-31642] Add Pagination Support for Structured Streaming Page
### What changes were proposed in this pull request?
Add Pagination Support for structured streaming page. Now both tables `Active Queries` and `Completed Queries` will have pagination.
To implement pagination, pagination framework from #7399  is used.
* Also tables will only be shown if there is at least one entry in the table.

### Why are the changes needed?
* This will help users in analysing their structured streaming queries in much better way.
* Other Web UI pages support pagination in their table. So this will make web UI more consistent across pages.
* This can prevent potential OOM errors.

### Does this PR introduce _any_ user-facing change?
Yes. Both tables will support pagination.

### How was this patch tested?
Manually. I will add snapshots soon.

Closes #28485 from iRakson/SPARK-31642.

Authored-by: iRakson <>
Signed-off-by: Kousuke Saruta <>
iRakson authored and sarutak committed May 23, 2020
1 parent 721cba5 commit fbb3144
Showing 3 changed files with 209 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.streaming.ui

import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.xml.Node

import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.util.Utils

private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
extends WebUIPage("") with Logging {
Expand All @@ -35,11 +39,147 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent)

def generateDataRow(request: HttpServletRequest, queryActive: Boolean)
(query: StreamingQueryUIData): Seq[Node] = {
private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus

val content = mutable.ListBuffer[Node]()
// show active queries table only if there is at least one active query
if (activeQueries.nonEmpty) {
// scalastyle:off
content ++=
<span id="active" class="collapse-aggregated-activeQueries collapse-table"
<h5 id="activequeries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Active Streaming Queries ({activeQueries.length})</a>
</span> ++
<ul class="aggregated-activeQueries collapsible-table">
{queryTable(activeQueries, request, "active")}
// scalastyle:on
// show active queries table only if there is at least one completed query
if (inactiveQueries.nonEmpty) {
// scalastyle:off
content ++=
<span id="completed" class="collapse-aggregated-completedQueries collapse-table"
<h5 id="completedqueries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Streaming Queries ({inactiveQueries.length})</a>
</span> ++
<ul class="aggregated-completedQueries collapsible-table">
{queryTable(inactiveQueries, request, "completed")}
// scalastyle:on

private def queryTable(data: Seq[StreamingQueryUIData], request: HttpServletRequest,
tableTag: String): Seq[Node] = {

val isActive = if (tableTag.contains("active")) true else false
val page = Option(request.getParameter(s"$")).map(_.toInt).getOrElse(1)

try {
new StreamingQueryPagedTable(
SparkUIUtils.prependBaseUri(request, parent.basePath),
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
<div class="alert alert-error">
<p>Error while rendering execution table:</p>

class StreamingQueryPagedTable(
request: HttpServletRequest,
parent: StreamingQueryTab,
data: Seq[StreamingQueryUIData],
tableTag: String,
isActive: Boolean,
basePath: String,
subPath: String) extends PagedTable[StructuredStreamingRow] {

private val (sortColumn, sortDesc, pageSize) = getTableParameters(request, tableTag, "Start Time")
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
private val encodedSortColumn = URLEncoder.encode(sortColumn,

override def tableId: String = s"$tableTag-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = s"$tableTag.pageSize"

override def pageNumberFormField: String = s"$"

override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$tableTag.sort=$encodedSortColumn" +
s"&$tableTag.desc=$sortDesc" +
s"&$pageSizeFormField=$pageSize" +

override def goButtonFormPath: String =

override def dataSource: PagedDataSource[StructuredStreamingRow] =
new StreamingQueryDataSource(data, sortColumn, sortDesc, pageSize, isActive)

override def headers: Seq[Node] = {
val headerAndCss: Seq[(String, Boolean, Option[String])] = {
("Name", true, None),
("Status", false, None),
("ID", true, None),
("Run ID", true, None),
("Start Time", true, None),
("Duration", true, None),
("Avg Input /sec", true, None),
("Avg Process /sec", true, None),
("Latest Batch", true, None)) ++ {
if (!isActive) {
Seq(("Error", false, None))
} else {
isSortColumnValid(headerAndCss, sortColumn)

headerRow(headerAndCss, sortDesc, pageSize, sortColumn, parameterPath, tableTag, tableTag)

override def row(query: StructuredStreamingRow): Seq[Node] = {
val streamingQuery = query.streamingUIData
val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,

def details(detail: Any): Seq[Node] = {
if (queryActive) {
if (isActive) {
return Seq.empty[Node]
val detailString = detail.asInstanceOf[String]
Expand All @@ -51,12 +191,39 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)

val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId)
<td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>

val name = UIUtils.getQueryName(query)
val status = UIUtils.getQueryStatus(query)
val duration = if (queryActive) {
case class StructuredStreamingRow(
duration: String,
avgInput: Double,
avgProcess: Double,
streamingUIData: StreamingQueryUIData)

class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: String, desc: Boolean,
pageSize: Int, isActive: Boolean) extends PagedDataSource[StructuredStreamingRow](pageSize) {

// convert StreamingQueryUIData to StreamingRow to provide required data for sorting and sort it
private val data =, desc))

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)

private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
val duration = if (isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
} else {
withNoProgress(query, {
Expand All @@ -65,79 +232,31 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
}, "-")

<td> {name} </td>
<td> {status} </td>
<td> {} </td>
<td> <a href={statisticsLink}> {query.runId} </a> </td>
<td> {SparkUIUtils.formatDate(query.startTimestamp)} </td>
<td> {duration} </td>
<td> {withNoProgress(query, {
( => withNumberInvalid(p.inputRowsPerSecond)).sum /
query.recentProgress.length).formatted("%.2f") }, "NaN")}
<td> {withNoProgress(query, {
( => withNumberInvalid(p.processedRowsPerSecond)).sum /
query.recentProgress.length).formatted("%.2f") }, "NaN")}
<td> {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} </td>
val avgInput = ( => withNumberInvalid(p.inputRowsPerSecond)).sum /

private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
val activeQueryTables = if (activeQueries.nonEmpty) {
val headerRow = Seq(
"Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
activeQueries, true, Some("activeQueries-table"), Seq(null), false))
} else {
val avgProcess = ( =>
withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)

val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
val headerRow = Seq(
"Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")
StructuredStreamingRow(duration, avgInput, avgProcess, query)

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
inactiveQueries, true, Some("completedQueries-table"), Seq(null), false))
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
case "Name" => => UIUtils.getQueryName(q.streamingUIData))
case "ID" =>
case "Run ID" =>
case "Start Time" =>
case "Duration" =>
case "Avg Input /sec" =>
case "Avg Process /sec" =>
case "Latest Batch" =>
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
if (desc) {
} else {

// scalastyle:off
val content =
<span id="active" class="collapse-aggregated-activeQueries collapse-table"
<h5 id="activequeries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Active Streaming Queries ({activeQueries.length})</a>
</span> ++
<ul class="aggregated-activeQueries collapsible-table">
</div> ++
<span id="completed" class="collapse-aggregated-completedQueries collapse-table"
<h5 id="completedqueries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Streaming Queries ({inactiveQueries.length})</a>
</span> ++
<ul class="aggregated-completedQueries collapsible-table">
// scalastyle:on

Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
var html = renderStreamingQueryPage(request, tab)
assert(html.contains("active streaming queries (1)"))
assert(html.contains("completed streaming queries (0)"))

html = renderStreamingQueryPage(request, tab)
assert(html.contains("active streaming queries (0)"))
assert(html.contains("completed streaming queries (1)"))

when(streamQuery.exception).thenReturn(Option("exception in query"))
html = renderStreamingQueryPage(request, tab)
assert(html.contains("active streaming queries (0)"))
assert(html.contains("completed streaming queries (1)"))
assert(html.contains("exception in query"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
goToUi(spark, "/StreamingQuery")

findAll(cssSelector("h3")).map(_.text).toSeq should contain("Streaming Query")
findAll(cssSelector("""#activeQueries-table th""")).map(_.text).toSeq should be {
List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

val arrow = 0x25BE.toChar
findAll(cssSelector("""#active-table th""")).map(_.text).toList should be {
List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration",
"Avg Input /sec", "Avg Process /sec", "Latest Batch")
val activeQueries =
findAll(cssSelector("""#activeQueries-table td""")).map(_.text).toSeq
findAll(cssSelector("""#active-table td""")).map(_.text).toSeq
activeQueries should contain(
activeQueries should contain(activeQuery.runId.toString)
findAll(cssSelector("""#completedQueries-table th"""))
.map(_.text).toSeq should be {
List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")
findAll(cssSelector("""#completed-table th"""))
.map(_.text).toList should be {
List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration",
"Avg Input /sec", "Avg Process /sec", "Latest Batch", "Error")
val completedQueries =
findAll(cssSelector("""#completedQueries-table td""")).map(_.text).toSeq
findAll(cssSelector("""#completed-table td""")).map(_.text).toSeq
completedQueries should contain(
completedQueries should contain(completedQuery.runId.toString)
completedQueries should contain(
completedQueries should contain(failedQuery.runId.toString)

// Check the query statistics page
val activeQueryLink =
findAll(cssSelector("""#activeQueries-table a""")).flatMap(_.attribute("href")).next
findAll(cssSelector("""#active-table td a""")).flatMap(_.attribute("href")).next
go to activeQueryLink

Expand Down

