diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index c8d51e44a4dff..3a30919a70584 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -35,7 +35,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
- /** Initialize all components of the server. Must be called before bind(). */
+ /** Initialize all components of the server. */
def start() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
@@ -59,25 +59,13 @@ class MasterWebUI(val master: Master, requestedPort: Int)
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.getHandlers) {
- rootHandler.addHandler(handler)
- if (!handler.isStarted) {
- handler.start()
- }
- }
+ ui.getHandlers.foreach(attachHandler)
}
/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.getHandlers) {
- if (handler.isStarted) {
- handler.stop()
- }
- rootHandler.removeHandler(handler)
- }
+ ui.getHandlers.foreach(detachHandler)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index ae1b7ab014e6e..490a383be42e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -38,7 +38,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
val timeout = AkkaUtils.askTimeout(worker.conf)
- /** Initialize all components of the server. Must be called before bind(). */
+ /** Initialize all components of the server. */
def start() {
val logPage = new LogPage(this)
attachPage(logPage)
diff --git a/core/src/main/scala/org/apache/spark/ui/FooTab.scala b/core/src/main/scala/org/apache/spark/ui/FooTab.scala
new file mode 100644
index 0000000000000..1e30fa75a263d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/FooTab.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
+import scala.xml.Node
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
+
+/*
+ * This is an example of how to extend the SparkUI by adding new tabs to it. It is intended
+ * only as a demonstration and should be removed before merging into master!
+ *
+ * bin/spark-class org.apache.spark.ui.FooTab
+ */
+
+/** A tab that displays basic information about jobs seen so far. */
+private[spark] class FooTab(parent: SparkUI) extends UITab("foo") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+
+ def start() {
+ listener = Some(new FooListener)
+ attachPage(new IndexPage(this))
+ }
+
+ def fooListener: FooListener = {
+ assert(listener.isDefined, "ExecutorsTab has not started yet!")
+ listener.get.asInstanceOf[FooListener]
+ }
+
+ def headerTabs: Seq[UITab] = parent.getTabs
+}
+
+/** A foo page. Enough said. */
+private[spark] class IndexPage(parent: FooTab) extends UIPage("") {
+ private val appName = parent.appName
+ private val basePath = parent.basePath
+ private val listener = parent.fooListener
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k }
+ val content =
+
+
+
Foo Jobs:
+
+ {results.map { case (k, v) => - Job {k}: {v}
}}
+
+
+
+ UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent)
+ }
+}
+
+/** A listener that maintains a mapping between job IDs and job results. */
+private[spark] class FooListener extends SparkListener {
+ val jobResultMap = mutable.Map[Int, String]()
+
+ override def onJobEnd(end: SparkListenerJobEnd) {
+ jobResultMap(end.jobId) = end.jobResult.toString
+ }
+}
+
+
+/**
+ * Start a SparkContext and a SparkUI with a FooTab attached.
+ */
+private[spark] object FooTab {
+ def main(args: Array[String]) {
+ val sc = new SparkContext("local", "Foo Tab", new SparkConf)
+ val fooTab = new FooTab(sc.ui)
+ sc.ui.attachTab(fooTab)
+
+ // Run a few jobs
+ sc.parallelize(1 to 1000).count()
+ sc.parallelize(1 to 2000).persist().count()
+ sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count()
+ sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count()
+ sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count()
+ sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count()
+ sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count()
+
+ readLine("\n> Started SparkUI with a Foo tab...")
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index c333dd3784bb7..ac22189f9f04f 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -52,8 +52,9 @@ private[spark] class SparkUI(
// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
+ listenerBus.addListener(storageStatusListener)
- /** Initialize all components of the server. Must be called before bind(). */
+ /** Initialize all components of the server. */
def start() {
attachTab(new JobProgressTab(this))
attachTab(new BlockManagerTab(this))
@@ -64,14 +65,10 @@ private[spark] class SparkUI(
if (live) {
sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
- // Storage status listener must receive events first, as other listeners depend on its state
- listenerBus.addListener(storageStatusListener)
- getListeners.foreach(listenerBus.addListener)
}
/** Bind to the HTTP server behind this web interface. */
def bind() {
- assert(!handlers.isEmpty, "SparkUI has not started yet!")
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
@@ -82,6 +79,12 @@ private[spark] class SparkUI(
}
}
+ /** Attach a tab to this UI, along with its corresponding listener if it exists. */
+ override def attachTab(tab: UITab) {
+ super.attachTab(tab)
+ tab.listener.foreach(listenerBus.addListener)
+ }
+
/** Stop the server behind this web interface. Only valid after bind(). */
override def stop() {
super.stop()
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 0b847a9a471f0..4392814fd1b39 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -35,7 +35,6 @@ import org.apache.spark.util.Utils
*
* Each WebUI represents a collection of tabs, each of which in turn represents a collection of
* pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
- * All tabs and pages must be attached before bind()'ing the server.
*/
private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") {
protected val tabs = ArrayBuffer[UITab]()
@@ -46,14 +45,14 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener)
- /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */
+ /** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: UITab) {
tab.start()
tab.pages.foreach(attachPage)
tabs += tab
}
- /** Attach a page to this UI. Only valid before bind(). */
+ /** Attach a page to this UI. */
def attachPage(page: UIPage) {
val pagePath = "/" + page.prefix
attachHandler(createServletHandler(pagePath,
@@ -64,9 +63,26 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
}
}
- /** Attach a handler to this UI. Only valid before bind(). */
+ /** Attach a handler to this UI. */
def attachHandler(handler: ServletContextHandler) {
handlers += handler
+ serverInfo.foreach { info =>
+ info.rootHandler.addHandler(handler)
+ if (!handler.isStarted) {
+ handler.start()
+ }
+ }
+ }
+
+ /** Detach a handler from this UI. */
+ def detachHandler(handler: ServletContextHandler) {
+ handlers -= handler
+ serverInfo.foreach { info =>
+ info.rootHandler.removeHandler(handler)
+ if (handler.isStarted) {
+ handler.stop()
+ }
+ }
}
/** Initialize all components of the server. Must be called before bind(). */
@@ -89,6 +105,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
}
}
+
/**
* A tab that represents a collection of pages and a unit of listening for Spark events.
* Associating each tab with a listener is arbitrary and need not be the case.
@@ -108,6 +125,7 @@ private[spark] abstract class UITab(val prefix: String) {
def start()
}
+
/**
* A page that represents the leaf node in the UI hierarchy.
*