Skip to content

Commit

Permalink
[SPARK-12299] Removed history server functionality from Master with r…
Browse files Browse the repository at this point in the history
…elated WebUI and JSON ApiRootResources
  • Loading branch information
BryanCutler committed Jan 28, 2016
1 parent 19fdb21 commit 21cb506
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ private[spark] class ApplicationInfo(
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
@transient @volatile var appUIUrlAtHistoryServer: Option[String] = None

// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
Expand All @@ -66,7 +65,6 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE
appUIUrlAtHistoryServer = None
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand Down Expand Up @@ -136,11 +134,4 @@ private[spark] class ApplicationInfo(
System.currentTimeMillis() - startTime
}
}

/**
* Returns the original application UI url unless there is its address at history server
* is defined
*/
def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)

}
110 changes: 1 addition & 109 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,24 @@

package org.apache.spark.deploy.master

import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random

import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}

private[deploy] class Master(
Expand All @@ -58,10 +48,6 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

private val rebuildUIThread =
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
Expand All @@ -84,8 +70,6 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]

private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -198,7 +182,6 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
Expand Down Expand Up @@ -375,10 +358,6 @@ private[deploy] class Master(
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}

case AttachCompletedRebuildUI(appId) =>
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -821,17 +800,13 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app

// If application events are logged, use them to rebuild the UI
asyncRebuildSparkUI(app)

for (exec <- app.executors.values) {
killExecutor(exec)
}
Expand Down Expand Up @@ -930,89 +905,6 @@ private[deploy] class Master(
exec.state = ExecutorState.KILLED
}

/**
* Rebuild a new SparkUI from the given application's event logs.
* Return the UI if successful, else None
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
Await.result(futureUI, Duration.Inf)
}

/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is disabled for this application
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
return Future.successful(None)
}
val futureUI = Future {
val eventLogFilePrefix = EventLoggingListener.getLogPath(
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))

val eventLogFile = if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
} else {
eventLogFilePrefix
}

val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
try {
replayBus.replay(logInput, eventLogFile, inProgressExists)
} finally {
logInput.close()
}

Some(ui)
}(rebuildUIContext)

futureUI.onSuccess { case Some(ui) =>
appIdToUI.put(app.id, ui)
// `self` can be null if we are already in the process of shutting down
// This happens frequently in tests where `local-cluster` is used
if (self != null) {
self.send(AttachCompletedRebuildUI(app.id))
}
// Application UI is successfully rebuilt, so link the Master UI to it
// NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
}(ThreadUtils.sameThread)

futureUI.onFailure {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")

case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
}(ThreadUtils.sameThread)

futureUI
}

/** Generate a new app ID given a app's submission date */
private def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,4 @@ private[master] object MasterMessages {
case object BoundPortsRequest

case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])

case class AttachCompletedRebuildUI(appId: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
{
if (!app.isFinished) {
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
}
}
</ul>
</div>
</div>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{killLink}
</td>
<td>
<a href={app.curAppUIUrl}>{app.desc.name}</a>
{
if (app.isFinished) {
app.desc.name
} else {
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
}
}
</td>
<td>
{app.coresGranted}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._

Expand All @@ -33,7 +31,7 @@ class MasterWebUI(
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
requestedPort, master.conf, name = "MasterUI") with Logging {

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
Expand All @@ -46,44 +44,13 @@ class MasterWebUI(
def initialize() {
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
}

def getSparkUI(appId: String): Option[SparkUI] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
(activeApps ++ completedApps).find { _.id == appId }.flatMap {
master.rebuildSparkUI
}
}
}

private[master] object MasterWebUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private[spark] object ApiRootResource {

/**
* This trait is shared by the all the root containers for application UI information --
* the HistoryServer, the Master UI, and the application UI. This provides the common
* the HistoryServer and the application UI. This provides the common
* interface needed for them all to expose application info as json.
*/
private[spark] trait UIRoot {
Expand Down
Loading

0 comments on commit 21cb506

Please sign in to comment.