diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index fcda341ae5941..b210c8d852898 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -18,12 +18,13 @@
package org.apache.spark.ui
import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{Locale, Date}
import scala.xml.Node
+import org.apache.spark.Logging
/** Utility functions for generating XML pages with spark content. */
-private[spark] object UIUtils {
+private[spark] object UIUtils extends Logging {
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
@@ -49,6 +50,80 @@ private[spark] object UIUtils {
"%.1f h".format(hours)
}
+ /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
+ def formatDurationVerbose(ms: Long): String = {
+ try {
+ val second = 1000L
+ val minute = 60 * second
+ val hour = 60 * minute
+ val day = 24 * hour
+ val week = 7 * day
+ val year = 365 * day
+
+ def toString(num: Long, unit: String): String = {
+ if (num == 0) {
+ ""
+ } else if (num == 1) {
+ s"$num $unit"
+ } else {
+ s"$num ${unit}s"
+ }
+ }
+
+ val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
+ val secondString = toString((ms % minute) / second, "second")
+ val minuteString = toString((ms % hour) / minute, "minute")
+ val hourString = toString((ms % day) / hour, "hour")
+ val dayString = toString((ms % week) / day, "day")
+ val weekString = toString((ms % year) / week, "week")
+ val yearString = toString(ms / year, "year")
+
+ Seq(
+ second -> millisecondsString,
+ minute -> s"$secondString $millisecondsString",
+ hour -> s"$minuteString $secondString",
+ day -> s"$hourString $minuteString $secondString",
+ week -> s"$dayString $hourString $minuteString",
+ year -> s"$weekString $dayString $hourString"
+ ).foreach { case (durationLimit, durationString) =>
+ if (ms < durationLimit) {
+ // if time is less than the limit (upto year)
+ return durationString
+ }
+ }
+ // if time is more than a year
+ return s"$yearString $weekString $dayString"
+ } catch {
+ case e: Exception =>
+ logError("Error converting time to string", e)
+ // if there is some error, return blank string
+ return ""
+ }
+ }
+
+ /** Generate a human-readable string representing a number (e.g. 100 K) */
+ def formatNumber(records: Double): String = {
+ val trillion = 1e12
+ val billion = 1e9
+ val million = 1e6
+ val thousand = 1e3
+
+ val (value, unit) = {
+ if (records >= 2*trillion) {
+ (records / trillion, " T")
+ } else if (records >= 2*billion) {
+ (records / billion, " B")
+ } else if (records >= 2*million) {
+ (records / million, " M")
+ } else if (records >= 2*thousand) {
+ (records / thousand, " K")
+ } else {
+ (records, "")
+ }
+ }
+ "%.1f%s".formatLocal(Locale.US, value, unit)
+ }
+
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
@@ -146,21 +221,36 @@ private[spark] object UIUtils {
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
def listingTable[T](
headers: Seq[String],
- makeRow: T => Seq[Node],
- rows: Seq[T],
+ generateDataRow: T => Seq[Node],
+ data: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
- val colWidth = 100.toDouble / headers.size
- val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
var tableClass = "table table-bordered table-striped table-condensed sortable"
if (fixedWidth) {
tableClass += " table-fixed"
}
-
+ val colWidth = 100.toDouble / headers.size
+ val colWidthAttr =if (fixedWidth) colWidth + "%" else ""
+ val headerRow: Seq[Node] = {
+ // if none of the headers have "\n" in them
+ if (headers.forall(!_.contains("\n"))) {
+ // represent header as simple text
+ headers.map(h =>
{h} | )
+ } else {
+ // represent header text as list while respecting "\n"
+ headers.map { case h =>
+
+
+ { h.split("\n").map { case t => - {t}
} }
+
+ |
+ }
+ }
+ }
- {headers.map(h => {h} | )}
+ {headerRow}
- {rows.map(r => makeRow(r))}
+ {data.map(r => generateDataRow(r))}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 906d4067a14eb..ff5d0aaa3d0bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -157,8 +157,7 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
- private[streaming] val ui = new StreamingTab(this)
- ui.start()
+ private[streaming] val uiTab = new StreamingTab(this)
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
similarity index 98%
rename from streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 32a4644e2a3e9..8921b99f53a23 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
-private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener {
+private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 5cd900c2f88f0..58960812e1205 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -17,30 +17,24 @@
package org.apache.spark.streaming.ui
-import java.util.{Calendar, Locale}
+import java.util.Calendar
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.Logging
import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
import org.apache.spark.util.Distribution
/** Page for Spark Web UI that shows statistics of a streaming job */
private[ui] class StreamingPage(parent: StreamingTab)
extends UIPage("") with Logging {
- private val ssc = parent.ssc
- private val sc = ssc.sparkContext
- private val sparkUI = sc.ui
- private val listener = new StreamingProgressListener(ssc)
- private val calendar = Calendar.getInstance()
- private val startTime = calendar.getTime()
+ private val listener = parent.streamingListener
+ private val startTime = Calendar.getInstance().getTime()
private val emptyCellTest = "-"
- ssc.addStreamingListener(listener)
- parent.attachPage(this)
-
/** Render the page */
override def render(request: HttpServletRequest): Seq[Node] = {
val content =
@@ -49,7 +43,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
generateNetworkStatsTable() ++
generateBatchStatsTable()
UIUtils.headerSparkPage(
- content, sparkUI.basePath, sc.appName, "Streaming", sparkUI.getTabs, parent, Some(5000))
+ content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
}
/** Generate basic stats of the streaming program */
@@ -60,13 +54,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
Started at: {startTime.toString}
- Time since start: {msDurationToString(timeSinceStart)}
+ Time since start: {formatDurationVerbose(timeSinceStart)}
Network receivers: {listener.numNetworkReceivers}
- Batch interval: {msDurationToString(listener.batchDuration)}
+ Batch interval: {formatDurationVerbose(listener.batchDuration)}
Processed batches: {listener.numTotalCompletedBatches}
@@ -85,7 +79,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val headerRow = Seq(
"Receiver",
"Location",
- "Records in last batch",
+ "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime())+ "]",
"Minimum rate\n[records/sec]",
"25th percentile rate\n[records/sec]",
"Median rate\n[records/sec]",
@@ -96,15 +90,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest)
- val receiverLastBatchRecords = numberToString(lastBatchReceivedRecord(receiverId))
+ val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles().map(r => numberToString(r.toLong))
+ d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
}.getOrElse {
Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest)
}
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
}
- Some(listingTable(headerRow, dataRows, fixedWidth = true))
+ Some(listingTable(headerRow, dataRows))
} else {
None
}
@@ -124,19 +118,19 @@ private[ui] class StreamingPage(parent: StreamingTab)
val processingDelayQuantilesRow = {
Seq(
"Processing Time",
- msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))
+ formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
) ++ getQuantiles(listener.processingDelayDistribution)
}
val schedulingDelayQuantilesRow = {
Seq(
"Scheduling Delay",
- msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))
+ formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
) ++ getQuantiles(listener.schedulingDelayDistribution)
}
val totalDelayQuantilesRow = {
Seq(
"Total Delay",
- msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))
+ formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
) ++ getQuantiles(listener.totalDelayDistribution)
}
val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
@@ -146,7 +140,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
schedulingDelayQuantilesRow,
totalDelayQuantilesRow
)
- Some(listingTable(headerRow, dataRows, fixedWidth = true))
+ Some(listingTable(headerRow, dataRows))
} else {
None
}
@@ -162,130 +156,25 @@ private[ui] class StreamingPage(parent: StreamingTab)
content
}
- /**
- * Returns a human-readable string representing a number
- */
- private def numberToString(records: Double): String = {
- val trillion = 1e12
- val billion = 1e9
- val million = 1e6
- val thousand = 1e3
-
- val (value, unit) = {
- if (records >= 2*trillion) {
- (records / trillion, " T")
- } else if (records >= 2*billion) {
- (records / billion, " B")
- } else if (records >= 2*million) {
- (records / million, " M")
- } else if (records >= 2*thousand) {
- (records / thousand, " K")
- } else {
- (records, "")
- }
- }
- "%.1f%s".formatLocal(Locale.US, value, unit)
- }
/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
- private def msDurationToString(ms: Long): String = {
- try {
- val second = 1000L
- val minute = 60 * second
- val hour = 60 * minute
- val day = 24 * hour
- val week = 7 * day
- val year = 365 * day
-
- def toString(num: Long, unit: String): String = {
- if (num == 0) {
- ""
- } else if (num == 1) {
- s"$num $unit"
- } else {
- s"$num ${unit}s"
- }
- }
-
- val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
- val secondString = toString((ms % minute) / second, "second")
- val minuteString = toString((ms % hour) / minute, "minute")
- val hourString = toString((ms % day) / hour, "hour")
- val dayString = toString((ms % week) / day, "day")
- val weekString = toString((ms % year) / week, "week")
- val yearString = toString(ms / year, "year")
-
- Seq(
- second -> millisecondsString,
- minute -> s"$secondString $millisecondsString",
- hour -> s"$minuteString $secondString",
- day -> s"$hourString $minuteString $secondString",
- week -> s"$dayString $hourString $minuteString",
- year -> s"$weekString $dayString $hourString"
- ).foreach { case (durationLimit, durationString) =>
- if (ms < durationLimit) {
- // if time is less than the limit (upto year)
- return durationString
- }
- }
- // if time is more than a year
- return s"$yearString $weekString $dayString"
- } catch {
- case e: Exception =>
- logError("Error converting time to string", e)
- // if there is some error, return blank string
- return ""
- }
- }
-
- /**
- * Returns a human-readable string representing a duration such as "5 second 35 ms"
- */
- private def msDurationToString(msOption: Option[Long]): String = {
- msOption.map(msDurationToString).getOrElse(emptyCellTest)
+ private def formatDurationOption(msOption: Option[Long]): String = {
+ msOption.map(formatDurationVerbose).getOrElse(emptyCellTest)
}
/** Get quantiles for any time distribution */
private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
- timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) }
+ timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
}
- /** Generate an HTML table constructed by generating a row for each object in a sequence. */
- def listingTable[T](
- headerRow: Seq[String],
- dataRows: Seq[Seq[String]],
- fixedWidth: Boolean = false
- ): Seq[Node] = {
-
- val colWidth = 100.toDouble / headerRow.size
- val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
- var tableClass = "table table-bordered table-striped table-condensed sortable"
- if (fixedWidth) {
- tableClass += " table-fixed"
- }
-
- def generateHeaderRow(header: Seq[String]): Seq[Node] = {
- headerRow.map { case h =>
-
-
- { h.split("\n").map { case t => - {t}
} }
-
- |
- }
- }
-
+ /** Generate HTML table from string data */
+ private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
def generateDataRow(data: Seq[String]): Seq[Node] = {
{data.map(d => {d} | )}
}
-
-
- {generateHeaderRow(headerRow)}
-
- {dataRows.map(r => generateDataRow(r))}
-
-
+ UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 1aaf7764b5ceb..5a817b067e4fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -18,15 +18,34 @@
package org.apache.spark.streaming.ui
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.UITab
+import org.apache.spark.ui.{SparkUI, UITab}
import org.apache.spark.Logging
+import java.util.concurrent.atomic.AtomicInteger
-/** Spark Web UI tab that shows statistics of a streaming job */
-private[spark] class StreamingTab(val ssc: StreamingContext)
- extends UITab("streaming") with Logging {
+/** Streaming tab in the Spark web UI */
+private[spark] class StreamingTab(ssc: StreamingContext)
+ extends UITab(StreamingTab.streamingTabName) with Logging {
- val streamingPage = new StreamingPage(this)
- ssc.sc.ui.attachTab(this)
+ val parent = ssc.sc.ui
+ val streamingListener = new StreamingJobProgressListener(ssc)
+ val basePath = parent.basePath
+ val appName = parent.appName
+
+ ssc.addStreamingListener(streamingListener)
+ attachPage(new StreamingPage(this))
+ parent.attachTab(this)
+
+ def headerTabs = parent.getTabs
def start() { }
}
+
+object StreamingTab {
+ private val atomicInteger = new AtomicInteger(0)
+
+ /** Generate the name of the streaming tab. For the first streaming tab it will be */
+ def streamingTabName: String = {
+ val count = atomicInteger.getAndIncrement
+ if (count == 0) "streaming" else s"streaming-$count"
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 8f6e3ea9dce40..5bba5d9a39dd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -17,15 +17,60 @@
package org.apache.spark.streaming
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
-import org.apache.spark.streaming.dstream.InputDStream
import scala.reflect.ClassTag
-import org.apache.spark.rdd.RDD
import scala.util.Random
+import scala.io.Source
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.InputDStream
+
+class UISuite extends FunSuite with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfter {
+ var sc: SparkContext = null
+ var ssc: StreamingContext = null
+
+ override def beforeAll() {
+ val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
+ conf.set("spark.cleaner.ttl", "1800")
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ if (sc != null) sc.stop()
+ }
+
+ before {
+ ssc = new StreamingContext(sc, Seconds(1))
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ }
-class UISuite extends FunSuite with BeforeAndAfterAll {
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext(sc, Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val uiData = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(uiData.contains("streaming"))
+ }
+ }
+
+ test("multiple streaming tabs") {
+ val ssc1 = new StreamingContext(sc, Seconds(1))
+ val ssc2 = new StreamingContext(sc, Seconds(2))
+ ssc1.uiTab.prefix should not be ssc2.uiTab.prefix
+ }
- test("Testing") {
+ ignore("Testing") {
runStreaming(1000000)
}