Skip to content

Commit

Permalink
Added UIRoot redirection from Master to Application
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Jan 29, 2016
1 parent 21cb506 commit 6918f0b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@

package org.apache.spark.deploy.master.ui

import java.net.URL
import java.util.regex.Pattern
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._

Expand Down Expand Up @@ -50,6 +57,66 @@ class MasterWebUI(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
attachHandler(createApiRootHandler())
}

def createApiRootHandler(): ServletContextHandler = {

val servlet = new HttpServlet {
private lazy val appIdPattern = Pattern.compile("\\/api\\/v\\d+\\/applications\\/([^\\/]+).*")

override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = {
doRequest(request, response)
}
override def doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = {
doRequest(request, response)
}
private def doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = {
val requestURI = request.getRequestURI

// requesting an application info list
if (requestURI == "applications") {
// TODO - Should send ApplicationList response ???
} else {
// forward request to app if it is active, otherwise send error
getAppId(request) match {
case Some(appId) =>
val state = masterPage.getMasterState
state.activeApps.find(appInfo => appInfo.id == appId) match {
case Some(appInfo) =>
val prefixedDestPath = appInfo.desc.appUiUrl + requestURI
val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
response.sendRedirect(newUrl)
request.getPathInfo
case None =>
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE)
}
case None =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST)
}
}
}

private def getAppId(request: HttpServletRequest): Option[String] = {
val m = appIdPattern.matcher(request.getRequestURI)
if (m.matches) Some(m.group(1)) else None
}

// SPARK-5983 ensure TRACE is not supported
protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}

createServletHandler("/api", servlet, "")
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType

import org.apache.spark.deploy.history.ApplicationHistoryInfo
import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ApplicationListResource(uiRoot: UIRoot) {
Expand Down Expand Up @@ -76,4 +77,26 @@ private[spark] object ApplicationsListResource {
}
)
}

def convertApplicationInfo(
internal: InternalApplicationInfo,
completed: Boolean): ApplicationInfo = {
// standalone application info always has just one attempt
new ApplicationInfo(
id = internal.id,
name = internal.desc.name,
coresGranted = Some(internal.coresGranted),
maxCores = internal.desc.maxCores,
coresPerExecutor = internal.desc.coresPerExecutor,
memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB),
attempts = Seq(new ApplicationAttemptInfo(
attemptId = None,
startTime = new Date(internal.startTime),
endTime = new Date(internal.endTime),
sparkUser = internal.desc.user,
completed = completed
))
)
}

}

0 comments on commit 6918f0b

Please sign in to comment.